diff --git a/examples/advanced/llm_hf/MULTINODE.md b/examples/advanced/llm_hf/MULTINODE.md index ef38e34758..83c4433db9 100644 --- a/examples/advanced/llm_hf/MULTINODE.md +++ b/examples/advanced/llm_hf/MULTINODE.md @@ -60,8 +60,11 @@ SLURM Job (2 nodes allocated) - **Does NOT use srun** to run the job script (only ONE FL client) #### 2. **Job Configuration** (`job.py`) -- Sends wrapper script to client: `job.to("client_wrapper.sh", site_name)` -- ScriptRunner uses simple command: `bash client_wrapper.sh` +- Uses `FedAvgRecipe` with `per_site_config` for multi-node setup +- When `--multi_node` flag is set: + - Sets command in per_site_config: `"command": "bash custom/client_wrapper.sh"` + - Adds wrapper script to job: `recipe.job.to("client_wrapper.sh", site_name)` +- Script arguments passed via `"train_args"` in per_site_config - No need to handle environment variables in Python #### 3. **Wrapper Script** (`client_wrapper.sh`) @@ -73,7 +76,9 @@ SLURM Job (2 nodes allocated) - Uses `CUDA_VISIBLE_DEVICES` to set GPUs. Assumes that they are set as comma-separated list, e.g. "0,1,2,3,4,5,6,7". **Why this works:** -- The wrapper script is included in the FL job package +- The wrapper script is included in the FL job package via `recipe.job.to("client_wrapper.sh", site_name)` +- It's placed in the `custom/` subdirectory of the job workspace +- Command is set to `bash custom/client_wrapper.sh` in the per_site_config - It runs in the same environment as the SLURM job (has access to `srun` and SLURM variables) - It handles all the complexity of multi-node coordination @@ -95,17 +100,23 @@ SLURM Job (2 nodes allocated) export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) export MASTER_PORT=29500 # Start NVFlare server and client - python3 job.py --client_ids dolly --gpu "[0,1,2,3,4,5,6,7]" ... + python3 job.py --client_ids dolly --data_path ${PWD}/dataset \ + --gpu "[0,1,2,3,4,5,6,7]" --multi_node \ + --workspace_dir ${PWD}/workspace --job_dir ${PWD}/jobs ``` 3. **`job.py` creates and exports FL job**: - - Includes `client_wrapper.sh` in job package - - Includes `client.py` training script - - ScriptRunner configured to run: `bash client_wrapper.sh` + - Uses `FedAvgRecipe` to configure the federated learning job + - For multi-node mode (`--multi_node` flag): + - Sets command via `per_site_config`: `"command": "bash custom/client_wrapper.sh"` + - Adds wrapper script: `recipe.job.to("client_wrapper.sh", site_name)` + - Includes `client.py` training script automatically via recipe + - Exports job configuration to specified directory 4. **NVFlare client receives training task**: - - Extracts job files to workspace - - Executes: `bash client_wrapper.sh client.py --args...` + - Extracts job files to workspace (including `custom/client_wrapper.sh`) + - Executes command from per_site_config: `bash custom/client_wrapper.sh` + - Wrapper script receives training script path and arguments 5. **Wrapper script detects multi-node setup**: ```bash @@ -125,27 +136,83 @@ SLURM Job (2 nodes allocated) ## Current Solution Advantages -✅ **Separation of concerns**: Job creation vs execution -✅ **Environment isolation**: Wrapper script runs in correct environment -✅ **Flexibility**: Works for both single-node and multi-node -✅ **Simplicity**: No complex string escaping or variable expansion -✅ **Debugging**: Wrapper script provides clear logging +✅ **Recipe Pattern**: Uses `FedAvgRecipe` for maintainable configuration +✅ **Separation of concerns**: Job creation vs execution +✅ **Environment isolation**: Wrapper script runs in correct environment +✅ **Flexibility**: Works for both single-node and multi-node via `--multi_node` flag +✅ **Per-Site Configuration**: Different commands and arguments per client via `per_site_config` +✅ **Simplicity**: No complex string escaping or variable expansion +✅ **Debugging**: Wrapper script provides clear logging ✅ **Portability**: Easy to modify for different cluster setups +## Job Configuration Arguments + +The `job.py` script accepts several arguments relevant to multi-node training: + +**Required:** +- `--client_ids`: Client/site names, space-separated (e.g., `dolly` or `hospital-1`). Used directly as site names. +- `--data_path`: Root directory containing client datasets +- `--multi_node`: Flag to enable multi-node training mode + +**Optional:** +- `--gpu`: GPU assignments (e.g., `"[0,1,2,3,4,5,6,7]"` for 8 GPUs) +- `--num_rounds`: Number of FL rounds (default: 3) +- `--local_epoch`: Local training epochs per round (default: 1) +- `--lr_scheduler`: Learning rate scheduler (default: "constant") +- `--train_mode`: Training mode - `SFT` or `PEFT` (default: "SFT") +- `--message_mode`: Communication format - `numpy` or `tensor` (default: "numpy") +- `--quantize_mode`: Model quantization for communication (e.g., `float16`, `blockwise8`) +- `--wandb_project`: WandB project name for experiment tracking +- `--wandb_run_name`: WandB run name +- `--use_tracking`: Enable TensorBoard tracking + +**Example with all key arguments:** +```bash +python3 job.py \ + --client_ids dolly \ + --data_path ${PWD}/dataset \ + --multi_node \ + --gpu "[0,1,2,3,4,5,6,7]" \ + --num_rounds 5 \ + --local_epoch 2 \ + --lr_scheduler cosine_with_restarts \ + --train_mode SFT \ + --message_mode tensor \ + --wandb_project my_llm_project \ + --workspace_dir ${PWD}/workspace \ + --job_dir ${PWD}/jobs +``` + ## Testing ### Single Node (8 GPUs) +When testing with a single node, you can either: + +**Option 1: Without `--multi_node` flag** (uses torchrun directly, no wrapper): +```bash +python3 job.py --client_ids dolly --data_path ./dataset \ + --gpu "[0,1,2,3,4,5,6,7]" \ + --workspace_dir ./workspace --job_dir ./jobs +``` + +**Option 2: With `--multi_node` flag** (uses wrapper script): ```bash sbatch --nodes=1 --gpus-per-node=8 nvflare.slurm ``` -Wrapper script detects `NNODES=1` and uses torchrun directly. +Wrapper script detects `NNODES=1` and uses torchrun directly (no srun). ### Multi-Node (2 nodes, 16 GPUs) +**Required: Must use `--multi_node` flag** ```bash sbatch --nodes=2 --gpus-per-node=8 nvflare.slurm ``` Wrapper script detects `NNODES=2` and uses srun + torchrun. +**The `--multi_node` flag is critical** - it tells `job.py` to: +- Use `client_wrapper.sh` instead of direct torchrun +- Include the wrapper script in the job package +- Set the correct command in the job configuration + ## Troubleshooting ### Check wrapper script execution @@ -217,9 +284,11 @@ In total X params to be sent to server. ## Key Principles for Multi-Node NVFlare + PyTorch DDP 1. **One FL Client Per Multi-node Cluster**: Only one NVFlare client process is needed per cluster, and it should run on the SLURM master node. -2. **Rank 0 Only for FL Operations**: Only global rank 0 talks to FL server -3. **Local Rank for GPU Selection**: Use local_rank (0-7) for `cuda:X` device mapping -4. **Global Rank for FL Communication**: Use rank (0-15) for NVFlare API calls -5. **Broadcast Coordination**: Rank 0 broadcasts FL state to all other ranks -6. **Shared Filesystem**: Only rank 0 saves checkpoints (avoid conflicts) -7. **Wrapper Script Pattern**: Separate job creation from execution environment +2. **Use `--multi_node` Flag**: Must be set in `job.py` to enable wrapper script and correct command configuration +3. **Recipe-Based Configuration**: Use `FedAvgRecipe` with `per_site_config` for flexible job setup +4. **Rank 0 Only for FL Operations**: Only global rank 0 talks to FL server +5. **Local Rank for GPU Selection**: Use local_rank (0-7) for `cuda:X` device mapping +6. **Global Rank for FL Communication**: Use rank (0-15) for NVFlare API calls +7. **Broadcast Coordination**: Rank 0 broadcasts FL state to all other ranks +8. **Shared Filesystem**: Only rank 0 saves checkpoints (avoid conflicts) +9. **Wrapper Script Pattern**: Separate job creation from execution environment via `client_wrapper.sh` diff --git a/examples/advanced/llm_hf/README.md b/examples/advanced/llm_hf/README.md index f736335693..2d807eefc4 100644 --- a/examples/advanced/llm_hf/README.md +++ b/examples/advanced/llm_hf/README.md @@ -40,6 +40,146 @@ python ./utils/preprocess_alpaca.py --training_file dataset/alpaca/data/train-00 python ./utils/preprocess_oasst1.py --training_file dataset/oasst1/data/train-00000-of-00001-b42a775f407cee45.parquet --validation_file dataset/oasst1/data/validation-00000-of-00001-134b8fd0c89408b6.parquet --output_dir dataset/oasst1 ``` +## Implementation Overview + +This implementation uses NVFlare's recipe-based pattern for federated learning with HuggingFace LLMs. Below is an overview of the key components: + +### Data +- **Datasets**: Three public instruction-tuning datasets (Dolly, Alpaca, OASST1) +- **Format**: JSONL files with `input` and `output` fields for instruction tuning +- **Preprocessing**: Each dataset is split into `training.jsonl` and `validation.jsonl` +- **Client Distribution**: Each client gets its own dataset directory (e.g., `dataset/dolly/`, `dataset/alpaca/`) + +### Model +The example supports two model definition files for different training modes: + +**`hf_sft_model.py` (Supervised Fine-Tuning)** +```python +class CausalLMModel(torch.nn.Module): + def __init__(self, model_name_or_path): + super(CausalLMModel, self).__init__() + self.model = AutoModelForCausalLM.from_pretrained(model_name_or_path) +``` + +**`hf_peft_model.py` (Parameter-Efficient Fine-Tuning)** +```python +class CausalLMPEFTModel(torch.nn.Module): + def __init__(self, model_name_or_path): + super(CausalLMPEFTModel, self).__init__() + peft_config = LoraConfig(lora_alpha=16, lora_dropout=0.1, r=64, + bias="none", task_type="CAUSAL_LM") + full_model = AutoModelForCausalLM.from_pretrained(model_name_or_path) + self.model = get_peft_model(full_model, peft_config) +``` + +### Client-Side Code +**`client.py`** - Federated client using HuggingFace SFTTrainer with DDP support + +Key features: +- **Multi-GPU Support**: Automatic DDP setup via `torch.distributed` +- **Rank Management**: Only rank 0 communicates with NVFlare server +- **Model Synchronization**: Broadcasts global model from rank 0 to all ranks +- **Federated Training Loop**: Integrates with NVFlare using numbered steps: + 1. Import nvflare client API + 2. Initialize NVFlare client API (`flare.init()`) + 3. Federated training rounds loop (`while flare.is_running()`) + 4. Receive global model from NVFlare (`flare.receive()`) + 5. Load global model state dict + 6. Evaluate global model for server-side model selection + 7. Train locally using SFTTrainer + 8. Compose output model parameters + 9. Construct trained FL model with metrics + 10. Send model back to NVFlare (`flare.send()`) + +**Launch Modes:** +- Single GPU: `python client.py [args]` +- Multi-GPU: `python -m torch.distributed.run --nnodes=1 --nproc_per_node=N --master_port=7777 client.py [args]` +- Multi-node: via `client_wrapper.sh` + +### Server-Side Code / Job Recipe +**`job.py`** - Job configuration using NVFlare's `FedAvgRecipe` pattern + +**Recipe-Based Approach:** +```python +# Create recipe with FedAvgRecipe +recipe = FedAvgRecipe( + name=job_name, + initial_model=initial_model, # CausalLMModel or CausalLMPEFTModel + min_clients=num_clients, + num_rounds=args.num_rounds, + train_script="client.py", + server_expected_format=server_expected_format, # "pytorch" or "numpy" + launch_external_process=True, + per_site_config=per_site_config, # Site-specific configurations +) +``` + +**Per-Site Configuration:** +Each client can have custom configurations for different data paths and multi-GPU setups: +```python +per_site_config = { + "dolly": { + "train_args": "--model_name_or_path meta-llama/llama-3.2-1b " + "--data_path_train ./dataset/dolly/training.jsonl " + "--data_path_valid ./dataset/dolly/validation.jsonl ...", + "command": "python3 -m torch.distributed.run --nnodes=1 " + "--nproc_per_node=2 --master_port=7777" + }, + "alpaca": { + "train_args": "--model_name_or_path meta-llama/llama-3.2-1b " + "--data_path_train ./dataset/alpaca/training.jsonl ...", + "command": "python3 -m torch.distributed.run --nnodes=1 " + "--nproc_per_node=2 --master_port=8888" + } +} +``` + +**Optional Features:** +- **Quantization**: Add ModelQuantizer and ModelDequantizer filters for communication efficiency +- **Experiment Tracking**: Enable TensorBoard tracking with `--use_tracking` +- **Extended Timeouts**: Automatic configuration for long-running LLM training + +### Run Job +The recipe supports multiple execution modes: + +**1. Export Only** (generate job config without running): +```bash +python job.py \ + --client_ids dolly \ + --data_path ${PWD}/dataset \ + --job_dir ${PWD}/workspace/jobs/job_config \ + --export_config +``` + +**2. Simulation Mode** (local testing): +```bash +python job.py \ + --client_ids dolly \ + --data_path ${PWD}/dataset \ + --workspace_dir ${PWD}/workspace/simulation \ + --job_dir ${PWD}/workspace/jobs/simulation +``` + +**3. Production Mode** (real deployment): +```bash +python job.py \ + --client_ids dolly \ + --data_path ${PWD}/dataset \ + --startup_kit_location /path/to/startup_kit \ + --username admin@nvidia.com +``` + +**Key Job Arguments:** +- `--client_ids`: Client/site names (space-separated). Used directly as site names (e.g., `dolly`, `hospital-1`) +- `--data_path`: Root directory containing client datasets +- `--train_mode`: `SFT` or `PEFT` +- `--message_mode`: `numpy` (float32) or `tensor` (bf16) +- `--quantize_mode`: Optional quantization (`float16`, `blockwise8`, `float4`, `normfloat4`) +- `--gpu`: GPU assignments, e.g., `"[0,1],[2,3]"` for two clients with 2 GPUs each +- `--ports`: Master ports for DDP, e.g., `7777 8888` +- `--num_rounds`: Number of federated learning rounds +- `--use_tracking`: Enable TensorBoard experiment tracking + ## Adaptation of Centralized Training Script to Federated Below, we illustrate how to adapt a standard HuggingFace SFT/PEFT training script to a federated paradigm with NVFlare. @@ -247,47 +387,5 @@ Alpaca: Oasst1: ![peft](./figs/peft_oasst1.png) - ## Multi-node Training The NVFlare client can run in a multi-node environment as well. The deployment depends on your cluster environment. We provide an example on how to test this with a SLURM-based cluster. See the details and some findings on ensuring the job runs correctly in multi-node setting in [MULTINODE.md](MULTINODE.md). - -### 1. Create a fresh virtual environment on your cluster -Create a fresh virtual environment on your cluster and install the requrements. -```bash -export VENV_DIR= -``` - -### 2. Create a NVFlare project -As an example, we create a project with only one client for the Dolly dataset. -```bash -nvflare poc prepare -c site-dolly -``` -Copy the created "prod_00" where your SLURM job can access it, i.e., a shared file system. - -```bash -export NVFLARE_PROJECT= -``` - -### 3. (Optionally) Set your Weights and Biases API Key -The training can be logged to WandB if you provide and API key via - -```bash -export WANDB_API_KEY= -``` - -### 4. Submit the SLURM Job - -Update your SLURM account name and partitions by providing the information in [nvflare.slurm](nvflare.slurm): - -``` -#SBATCH -A [ACCOUNT_NAME] -#SBATCH --partition=[PARTITION_NAME1,PARTITION_NAME2,...] -``` - -By default, you can submit a job, requesting 2 nodes with 8 GPUs via - -```bash -sbatch nvflare.slurm -``` - -For more options, see [MULTINODE.md](MULTINODE.md#testing). diff --git a/examples/advanced/llm_hf/client.py b/examples/advanced/llm_hf/client.py index 55ffc05fc0..db142d3a4b 100755 --- a/examples/advanced/llm_hf/client.py +++ b/examples/advanced/llm_hf/client.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +HuggingFace LLM client for multi-GPU federated learning. + +Supports both SFT (Supervised Fine-Tuning) and PEFT (Parameter-Efficient Fine-Tuning). +Launch with: + - Single GPU: python client.py [args] + - Multi GPU: python -m torch.distributed.run --nnodes=1 --nproc_per_node=N --master_port=7777 client.py [args] + - Multi-node: via client_wrapper.sh +""" + import argparse import copy import os @@ -28,11 +38,14 @@ from transformers import AutoModelForCausalLM, TrainerCallback, trainer_utils from trl import SFTConfig, SFTTrainer +# (1) import nvflare client API import nvflare.client as flare # Add callback to stop at each epoch class StopCallback(TrainerCallback): + """Callback to stop training after each epoch for federated learning.""" + def on_epoch_end(self, args, state, control, logs=None, **kwargs): control.should_training_stop = True @@ -44,20 +57,25 @@ def on_epoch_end(self, args, state, control, logs=None, **kwargs): def format_instruction(example): + """Format training examples for instruction tuning.""" return f"### Instruction: Generate Output according to the information and question given by Input. ### Input:{example['input']} ### Response: {example['output']}" def setup_distributed_training(): - """Setup distributed training environment.""" + """Setup distributed training environment. + + Returns: + tuple: (rank, world_size, local_rank) + """ if "RANK" in os.environ and "WORLD_SIZE" in os.environ: rank = int(os.environ["RANK"]) world_size = int(os.environ["WORLD_SIZE"]) local_rank = int(os.environ.get("LOCAL_RANK", 0)) - # Set device + # Set device for DDP torch.cuda.set_device(local_rank) - print(f"Distributed training initialized: rank={rank}, world_size={world_size}, local_rank={local_rank}") + print(f"DDP rank {rank} initialized: world_size={world_size}, local_rank={local_rank}") return rank, world_size, local_rank else: print("No distributed training environment detected, running in single GPU mode") @@ -107,9 +125,14 @@ def main(): parser.add_argument("--local_epoch", type=int, default=1) parser.add_argument("--num_rounds", type=int, default=3) parser.add_argument( - "--wandb_project", type=str, default="fl_exp", help="WandB project name (enables WandB if provided)" + "--wandb_project", + type=str, + default="nvflare_llm", + help="WandB project name (default: nvflare_llm). WandB is enabled if WANDB_API_KEY is set.", + ) + parser.add_argument( + "--wandb_run_name", type=str, default="nvflare_llm", help="WandB run name, default to nvflare_llm" ) - parser.add_argument("--wandb_run_name", type=str, default="multinode", help="WandB run name") args = parser.parse_args() # Setup distributed training @@ -120,11 +143,10 @@ def main(): device_map = {"": local_rank} # Set WandB environment variables (only if API key is set and rank 0 will actually log) - wandb_enabled = args.wandb_project and os.environ.get("WANDB_API_KEY") + wandb_enabled = bool(os.environ.get("WANDB_API_KEY")) if wandb_enabled: os.environ["WANDB_PROJECT"] = args.wandb_project - if args.wandb_run_name: - os.environ["WANDB_NAME"] = args.wandb_run_name + os.environ["WANDB_NAME"] = args.wandb_run_name # Add FL-specific tags os.environ["WANDB_TAGS"] = ( f"nvflare,multi-node,{world_size}gpus,{os.environ.get('SLURM_JOB_NUM_NODES', '1')}nodes" @@ -134,7 +156,7 @@ def main(): elif rank == 0: print("Rank 0: WandB disabled (WANDB_API_KEY not set), using TensorBoard for logging") - # initializes NVFlare client API + # (2) Initialize NVFlare client API # IMPORTANT: Only global rank 0 should interact with NVFlare # In multi-node training, rank 0 is on the master node where the FL client runs flare.init(rank=rank) @@ -252,10 +274,10 @@ def main(): callbacks=[StopCallback()], ) - # Train federated rounds - # start with global model at the beginning of each round + # (3) Train federated rounds + # Start with global model at the beginning of each round while flare.is_running(): - # receives golobal model from NVFlare (only on rank 0) + # (4) Receive global model from NVFlare (only on rank 0) if rank == 0: print("Rank 0: Waiting to receive model from FL server...") input_model = flare.receive(timeout=600) @@ -266,7 +288,7 @@ def main(): dist.broadcast_object_list(stop_signal, src=0) break curr_round = input_model.current_round - print(f"Rank 0: Received model for round {curr_round}") + print(f"Rank 0: Received model for round {curr_round}, Site={flare.get_site_name()}") # Update the key name received from global model if using model def file global_model = copy.deepcopy(input_model.params) for key in list(global_model.keys()): @@ -275,7 +297,7 @@ def main(): curr_round = None global_model = None - # broadcast current round and global_model to all processes + # Broadcast current round and global_model to all processes if dist.is_initialized(): curr_round_list = [curr_round] global_model_list = [global_model] @@ -284,27 +306,32 @@ def main(): curr_round = curr_round_list[0] global_model = global_model_list[0] + # Sync all processes before loading model if dist.is_initialized(): dist.barrier() - # Load state dict + # (5) Load global model state dict if train_mode: set_peft_model_state_dict(trainer.model, global_model) else: trainer.model.load_state_dict(global_model) - # Wait for main process to finish model loading + + # Wait for all processes to finish model loading if dist.is_initialized(): dist.barrier() - # Evaluate the global model + # (6) Evaluate the global model for server-side model selection eval_loss = trainer.evaluate() eval_loss = float(eval_loss["eval_loss"]) + if rank == 0: + print(f"Rank 0: Global model eval_loss: {eval_loss}") - # Train + # (7) Train locally if curr_round == 0: # First round, start from pretrained model for epoch in range(args.local_epoch): - print(f"Training local epoch {epoch + 1}/{args.local_epoch}") + if rank == 0: + print(f"Rank 0: Training local epoch {epoch + 1}/{args.local_epoch}") # train for one epoch if epoch == 0: trainer.train() @@ -312,7 +339,7 @@ def main(): # continue training trainer.train(resume_from_checkpoint=True) else: - # replace local resume weights with global weights (only on global rank 0) + # Replace local resume weights with global weights (only on global rank 0) # Use rank (not local_rank) since all nodes share the same filesystem if rank == 0: resume_from_checkpoint_folder = trainer_utils.get_last_checkpoint(trainer.args.output_dir) @@ -331,17 +358,18 @@ def main(): if dist.is_initialized(): dist.barrier() - # continue training - # as we used callback, no need to increment num_train_epochs + # Continue training from checkpoint + # As we used callback, no need to increment num_train_epochs for epoch in range(args.local_epoch): - print(f"Training local epoch {epoch + 1}/{args.local_epoch}") + if rank == 0: + print(f"Rank 0: Training local epoch {epoch + 1}/{args.local_epoch}") trainer.train(resume_from_checkpoint=True) - # Wait for all process to finish training before continuing + # Wait for all processes to finish training before continuing if dist.is_initialized(): dist.barrier() - # compose output model to send back to server (only on rank 0) + # (8) Compose output model to send back to server (only on rank 0) if rank == 0: if train_mode: # PEFT, load PEFT part from trainer model @@ -350,26 +378,25 @@ def main(): # SFT, load whole model state_dict out_param = trainer.model.state_dict() - # update the key name sent to global model + # Update the key name sent to global model if not train_mode: for key in list(out_param.keys()): out_param["model." + key] = out_param.pop(key).cpu() if args.message_mode.lower() == "numpy": - # cast out_param to float32 preparing for communication with numpy - # otherwise do nothing + # Cast out_param to float32 preparing for communication with numpy out_param = {k: v.to(torch.float32) for k, v in out_param.items()} - # print the dict size - print(f"In total {len(out_param.keys())} params to be sent to server.") + # Print the dict size + print(f"Rank 0: Sending {len(out_param.keys())} params to server.") - # construct trained FL model + # (9) Construct trained FL model output_model = flare.FLModel( params=out_param, metrics={"eval_loss": eval_loss}, meta={"NUM_STEPS_CURRENT_ROUND": trainer.train_dataset.num_rows}, ) - # send model back to NVFlare + # (10) Send model back to NVFlare flare.send(output_model) # Cleanup distributed training environment diff --git a/examples/advanced/llm_hf/hf_peft_model.py b/examples/advanced/llm_hf/hf_peft_model.py index b2545864c7..efa9488f10 100755 --- a/examples/advanced/llm_hf/hf_peft_model.py +++ b/examples/advanced/llm_hf/hf_peft_model.py @@ -20,6 +20,7 @@ class CausalLMPEFTModel(torch.nn.Module): def __init__(self, model_name_or_path): super(CausalLMPEFTModel, self).__init__() + self.model_name_or_path = model_name_or_path # PEFT configs peft_config = LoraConfig( lora_alpha=16, diff --git a/examples/advanced/llm_hf/hf_sft_model.py b/examples/advanced/llm_hf/hf_sft_model.py index fd84c3f06a..e471ffd09e 100755 --- a/examples/advanced/llm_hf/hf_sft_model.py +++ b/examples/advanced/llm_hf/hf_sft_model.py @@ -19,6 +19,7 @@ class CausalLMModel(torch.nn.Module): def __init__(self, model_name_or_path): super(CausalLMModel, self).__init__() + self.model_name_or_path = model_name_or_path self.model = AutoModelForCausalLM.from_pretrained( model_name_or_path, ) diff --git a/examples/advanced/llm_hf/job.py b/examples/advanced/llm_hf/job.py index 263346ded6..df6e9aec7f 100644 --- a/examples/advanced/llm_hf/job.py +++ b/examples/advanced/llm_hf/job.py @@ -12,298 +12,226 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Job configuration for LLM HuggingFace federated learning using FedAvgRecipe pattern. +""" + import argparse import os +from typing import Dict -from nvflare import FedJob, FilterType -from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector -from nvflare.app_common.workflows.fedavg import FedAvg -from nvflare.app_opt.pt.file_model_persistor import PTFileModelPersistor from nvflare.app_opt.pt.quantization.dequantizer import ModelDequantizer from nvflare.app_opt.pt.quantization.quantizer import ModelQuantizer -from nvflare.job_config.script_runner import ScriptRunner +from nvflare.app_opt.pt.recipes.fedavg import FedAvgRecipe from nvflare.private.fed.utils.fed_utils import split_gpus -from nvflare.recipe import ProdEnv, SimEnv -from nvflare.recipe.spec import Recipe +from nvflare.recipe import ProdEnv, SimEnv, add_experiment_tracking + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--client_ids", + nargs="+", + type=str, + default="", + help="Client/site names (space-separated). Used directly as site names and for data paths (e.g., 'dolly', 'hospital-1').", + ) + parser.add_argument("--num_rounds", type=int, default=3, help="Number of FL rounds") + parser.add_argument( + "--workspace_dir", + type=str, + default="/tmp/nvflare/jobs/llm_hf/workdir", + help="Work directory for simulator runs", + ) + parser.add_argument( + "--job_dir", + type=str, + default="/tmp/nvflare/jobs/llm_hf/jobdir", + help="Directory for job export", + ) + parser.add_argument("--model_name_or_path", type=str, default="meta-llama/llama-3.2-1b", help="Model name or path") + parser.add_argument("--data_path", type=str, default="", help="Root directory for training and validation data") + parser.add_argument("--train_mode", type=str, default="SFT", help="Training mode, SFT or PEFT") + parser.add_argument("--quantize_mode", type=str, default=None, help="Quantization mode, default None") + parser.add_argument("--message_mode", type=str, default="numpy", help="Message mode: numpy or tensor") + parser.add_argument("--local_epoch", type=int, default=1, help="Number of local training epochs per round") + parser.add_argument("--lr_scheduler", type=str, default="constant", help="Learning rate scheduler type") + parser.add_argument("--threads", type=int, help="Number of threads for FL simulation") + parser.add_argument( + "--gpu", + type=str, + default="0", + help="GPU assignments for simulated clients, comma separated, default single GPU", + ) + parser.add_argument("--ports", nargs="+", default=["7777"], help="Ports for clients, default to 7777") + parser.add_argument("--multi_node", action="store_true", help="Enable multi-node training") + parser.add_argument("--startup_kit_location", type=str, default=None, help="Startup kit location") + parser.add_argument("--username", type=str, default="admin@nvidia.com", help="Username for production mode") + parser.add_argument( + "--wandb_project", type=str, default="nvflare_llm", help="WandB project name (default: nvflare_llm)" + ) + parser.add_argument( + "--wandb_run_name", type=str, default="nvflare_llm", help="WandB run name (default: nvflare_llm)" + ) + parser.add_argument("--use_tracking", action="store_true", help="Enable TensorBoard tracking") + parser.add_argument("--export_config", action="store_true", help="Export job config only") + return parser.parse_args() -def main(args): - train_script = "client.py" +def main(): + print("Starting llm_hf recipe job...") + args = define_parser() + print("args:", args) + client_ids = args.client_ids + if not client_ids: + raise ValueError("client_ids cannot be empty. Please specify at least one client ID.") num_clients = len(client_ids) - # get the GPU assignments and ports gpus = split_gpus(args.gpu) gpus = [g.split(",") for g in gpus] - ports = args.ports + ports = args.ports if isinstance(args.ports, list) else [args.ports] print(f"Clients: {client_ids}, GPUs: {gpus}, ports: {ports}") - # make sure the number of GPUs matches the number of clients if len(gpus) != num_clients: raise ValueError(f"Number of GPUs ({len(gpus)}) does not match number of clients ({num_clients}).") - # make sure the number of ports equal or greater than the number of clients if len(ports) < num_clients: raise ValueError(f"Number of ports ({len(ports)}) is less than number of clients ({num_clients}).") - if args.threads: - num_threads = args.threads - else: - num_threads = num_clients - - num_rounds = args.num_rounds - workspace_dir = args.workspace_dir - job_dir = args.job_dir - model_name_or_path = args.model_name_or_path - train_mode = args.train_mode - message_mode = args.message_mode - - # Create the FedJob - if train_mode.lower() == "sft": - job = FedJob(name="llm_hf_sft", min_clients=num_clients) + num_threads = args.threads if args.threads else num_clients + + # Determine train mode and model configuration + train_mode = args.train_mode.lower() + if train_mode == "sft": + from hf_sft_model import CausalLMModel + + initial_model = CausalLMModel(model_name_or_path=args.model_name_or_path) + job_name = "llm_hf_sft" output_path = "sft" - elif train_mode.lower() == "peft": - job = FedJob(name="llm_hf_peft", min_clients=num_clients) + elif train_mode == "peft": + from hf_peft_model import CausalLMPEFTModel + + initial_model = CausalLMPEFTModel(model_name_or_path=args.model_name_or_path) + job_name = "llm_hf_peft" output_path = "peft" else: - raise ValueError(f"Invalid train_mode: {train_mode}, only SFT and PEFT are supported.") - - # Define the FedAvg controller workflow and send to server - controller = FedAvg( - num_clients=num_clients, - num_rounds=num_rounds, - ) - job.to(controller, "server") - - if args.quantize_mode: - # If using quantization, add quantize filters. - quantizer = ModelQuantizer(quantization_type=args.quantize_mode) - dequantizer = ModelDequantizer() - job.to(quantizer, "server", tasks=["train"], filter_type=FilterType.TASK_DATA) - job.to(dequantizer, "server", tasks=["train"], filter_type=FilterType.TASK_RESULT) - - # Define the model persistor and send to server - if train_mode.lower() == "sft": - # First send the model to the server - job.to("hf_sft_model.py", "server") - # Then send the model persistor to the server - model_args = {"path": "hf_sft_model.CausalLMModel", "args": {"model_name_or_path": model_name_or_path}} - elif train_mode.lower() == "peft": - # First send the model to the server - job.to("hf_peft_model.py", "server") - # Then send the model persistor to the server - model_args = {"path": "hf_peft_model.CausalLMPEFTModel", "args": {"model_name_or_path": model_name_or_path}} - # When using message_mode="tensor", we need to set allow_numpy_conversion=False - allow_numpy_conversion = message_mode != "tensor" - job.to( - PTFileModelPersistor(model=model_args, allow_numpy_conversion=allow_numpy_conversion), "server", id="persistor" - ) + raise ValueError(f"Invalid train_mode: {train_mode}, only SFT and PEFT are supported (case-insensitive).") + + # Determine message mode and server format + message_mode = args.message_mode.lower() + if message_mode == "tensor": + server_expected_format = "pytorch" + elif message_mode == "numpy": + server_expected_format = "numpy" + else: + raise ValueError(f"Invalid message_mode: {message_mode}, only numpy and tensor are supported.") - # Add model selection widget and send to server - job.to(IntimeModelSelector(key_metric="eval_loss", negate_key_metric=True), "server", id="model_selector") + # Use client_ids directly as site names + client_names = client_ids - # Send ScriptRunner to all clients - client_names = [] - for i in range(num_clients): - client_id = client_ids[i] - site_name = f"site-{client_id}" - client_names.append(site_name) + # Build per_site_config for multi-GPU or multi-node scenarios + per_site_config: Dict[str, Dict] = {} + for idx, client_id in enumerate(client_ids): + site_name = client_names[idx] + site_gpus = gpus[idx] data_path_train = os.path.join(args.data_path, client_id, "training.jsonl") data_path_valid = os.path.join(args.data_path, client_id, "validation.jsonl") - script_args = f"--model_name_or_path {model_name_or_path} --data_path_train {data_path_train} --data_path_valid {data_path_valid} --output_path {output_path} --train_mode {train_mode} --message_mode {message_mode} --num_rounds {num_rounds}" - - # Add WandB arguments if provided - if args.wandb_project: - wandb_run_name = args.wandb_run_name if args.wandb_run_name else f"nvflare_{train_mode.lower()}_{client_id}" - script_args += f" --wandb_project {args.wandb_project} --wandb_run_name {wandb_run_name}" - if message_mode == "tensor": - server_expected_format = "pytorch" - elif message_mode == "numpy": - server_expected_format = "numpy" - else: - raise ValueError(f"Invalid message_mode: {message_mode}, only numpy and tensor are supported.") + # Build script arguments for this site + script_args = ( + f"--model_name_or_path {args.model_name_or_path} " + f"--data_path_train {data_path_train} " + f"--data_path_valid {data_path_valid} " + f"--output_path {output_path} " + f"--train_mode {train_mode} " + f"--message_mode {message_mode} " + f"--num_rounds {args.num_rounds} " + f"--local_epoch {args.local_epoch} " + f"--lr_scheduler {args.lr_scheduler}" + ) + + # Add WandB arguments (will be enabled if WANDB_API_KEY is set) + script_args += f" --wandb_project {args.wandb_project} --wandb_run_name {args.wandb_run_name}" + + # Determine command for multi-GPU or multi-node + site_config = {"train_args": script_args} if args.multi_node: - # Multi-GPU/Multi-node distributed training - # Use a wrapper script that handles srun coordination - # The wrapper script will detect if running on single or multiple nodes - print(f"Client {client_id}: Creating multi-node training job with {len(gpus[i])} GPUs per node") - - # Send the wrapper script to the client - job.to("client_wrapper.sh", site_name) - - # Use the wrapper script which will call srun if needed - runner = ScriptRunner( - script=train_script, - script_args=script_args, - server_expected_format=server_expected_format, - launch_external_process=True, - command="bash custom/client_wrapper.sh", - ) - elif len(gpus[i]) == 1: - # Single-GPU training - print(f"Client {client_id}: Creating single-GPU training job with {len(gpus[i])} GPUs") - - runner = ScriptRunner( - script=train_script, - script_args=script_args, - server_expected_format=server_expected_format, - launch_external_process=True, + site_config["command"] = "bash custom/client_wrapper.sh" + elif len(site_gpus) > 1: + site_config["command"] = ( + f"python3 -m torch.distributed.run --nnodes=1 --nproc_per_node={len(site_gpus)} " + f"--master_port={ports[idx]}" ) - elif len(gpus[i]) > 1: - # Multi-GPU training (single node) - print(f"Client {client_id}: Creating multi-GPU training job with {len(gpus[i])} GPUs") - - runner = ScriptRunner( - script=train_script, - script_args=script_args, - server_expected_format=server_expected_format, - launch_external_process=True, - command=f"python3 -m torch.distributed.run --nnodes=1 --nproc_per_node={len(gpus[i])} --master_port={ports[i]}", - ) - job.to(runner, site_name, tasks=["train"]) - if args.quantize_mode: - job.to(quantizer, site_name, tasks=["train"], filter_type=FilterType.TASK_RESULT) - job.to(dequantizer, site_name, tasks=["train"], filter_type=FilterType.TASK_DATA) + per_site_config[site_name] = site_config + + # Create FedAvgRecipe + recipe = FedAvgRecipe( + name=job_name, + initial_model=initial_model, + min_clients=num_clients, + num_rounds=args.num_rounds, + train_script="client.py", + server_expected_format=server_expected_format, + launch_external_process=True, # Always use external process for LLM training + per_site_config=per_site_config, + ) - # Add additional parameters to clients + # Add client params to reduce timeout failures for longer LLM runs + for site_name in client_names: client_params = {"get_task_timeout": 300, "submit_task_result_timeout": 300} - job.to(client_params, site_name) + recipe.job.to(client_params, site_name) + + # Add client_wrapper.sh for multi-node training + if args.multi_node: + for site_name in client_names: + recipe.job.to("client_wrapper.sh", site_name) + + # Add quantization filters if specified + if args.quantize_mode: + from nvflare import FilterType + + quantizer = ModelQuantizer(quantization_type=args.quantize_mode.lower()) + dequantizer = ModelDequantizer() - # Export the job - print("job_dir=", job_dir) - job.export_job(job_dir) + # Add to server + recipe.job.to(quantizer, "server", tasks=["train"], filter_type=FilterType.TASK_DATA) + recipe.job.to(dequantizer, "server", tasks=["train"], filter_type=FilterType.TASK_RESULT) - # Run job using a recipe - recipe = Recipe(job) + # Add to all clients + for site_name in client_names: + recipe.job.to(quantizer, site_name, tasks=["train"], filter_type=FilterType.TASK_RESULT) + recipe.job.to(dequantizer, site_name, tasks=["train"], filter_type=FilterType.TASK_DATA) + + # Add experiment tracking if requested + if args.use_tracking: + add_experiment_tracking(recipe, tracking_type="tensorboard") + + # Export job configuration + print("Exporting job to", args.job_dir) + recipe.export(args.job_dir) + print("Job config exported to", args.job_dir) + + # If export-only mode, stop here + if args.export_config: + return + + # Run recipe if args.startup_kit_location: - # Run the job in production mode print("Running job in production mode...") - print("startup_kit_location=", args.startup_kit_location) - print("username=", args.username) env = ProdEnv(startup_kit_location=args.startup_kit_location, username=args.username) - run = recipe.execute(env) - print("Job Status is:", run.get_status()) - print("Job Result is:", run.get_result()) else: - # Run the job in simulation mode print("Running job in simulation mode...") - print("workspace_dir=", workspace_dir) - print("num_threads=", num_threads) - env = SimEnv(clients=client_names, num_threads=num_threads, gpu_config=args.gpu, workspace_root=workspace_dir) - run = recipe.execute(env) - print("Job Status is:", run.get_status()) - print("Job Result is:", run.get_result()) + env = SimEnv( + clients=client_names, num_threads=num_threads, gpu_config=args.gpu, workspace_root=args.workspace_dir + ) - -def define_parser(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--client_ids", - nargs="+", - type=str, - default="", - help="Clinet IDs, used to get the data path for each client", - ) - parser.add_argument( - "--num_rounds", - type=int, - default=3, - help="Number of rounds, default to 3", - ) - parser.add_argument( - "--workspace_dir", - type=str, - default="/tmp/nvflare/jobs/llm_hf/workdir", - help="work directory, default to '/tmp/nvflare/jobs/llm_hf/workdir'", - ) - parser.add_argument( - "--job_dir", - type=str, - default="/tmp/nvflare/jobs/llm_hf/jobdir", - help="directory for job export, default to '/tmp/nvflare/jobs/llm_hf/jobdir'", - ) - parser.add_argument( - "--model_name_or_path", - type=str, - default="meta-llama/llama-3.2-1b", - help="model name or path", - ) - parser.add_argument( - "--data_path", - type=str, - default="", - help="root directory for training and validation data", - ) - parser.add_argument( - "--train_mode", - type=str, - default="SFT", - help="training mode, SFT or PEFT, default to SFT", - ) - parser.add_argument( - "--quantize_mode", - type=str, - default=None, - help="quantization mode, default to None (no quantization)", - ) - parser.add_argument( - "--message_mode", - type=str, - default="numpy", - help="message mode, numpy or tensor, default to numpy", - ) - parser.add_argument( - "--threads", - type=int, - help="number of threads to use for FL simulation, default to the number of clients", - ) - parser.add_argument( - "--gpu", - type=str, - default="0", - help="gpu assignments for simulating clients, comma separated, default to single gpu", - ) - parser.add_argument( - "--ports", - nargs="+", - default="7777", - help="ports for the clients, default to one client 7777", - ) - parser.add_argument( - "--multi_node", - action="store_true", - help="enable multi-node training, default to False", - ) - parser.add_argument( - "--startup_kit_location", - type=str, - default=None, - help="startup kit location, default to None", - ) - parser.add_argument( - "--username", - type=str, - default="admin@nvidia.com", - help="username, default to None", - ) - parser.add_argument( - "--wandb_project", - type=str, - default=None, - help="WandB project name (enables WandB tracking if provided)", - ) - parser.add_argument( - "--wandb_run_name", - type=str, - default=None, - help="WandB run name (defaults to nvflare__)", - ) - return parser.parse_args() + run = recipe.execute(env) + print() + print("Job Status is:", run.get_status()) + print("Result can be found in:", run.get_result()) + print() if __name__ == "__main__": - print("Starting job...") - args = define_parser() - print("args:", args) - main(args) + main() diff --git a/examples/advanced/llm_hf/nvflare.slurm b/examples/advanced/llm_hf/nvflare.slurm index 4547b6c894..fbd4438ec9 100644 --- a/examples/advanced/llm_hf/nvflare.slurm +++ b/examples/advanced/llm_hf/nvflare.slurm @@ -43,7 +43,7 @@ sleep 10 # start client on first node (if using production mode) echo "Starting NVFlare client..." -${NVFLARE_PROJECT}/site-dolly/startup/start.sh +${NVFLARE_PROJECT}/dolly/startup/start.sh # wait for client to start up (NVFlare will handle connection retry) sleep 20