diff --git a/.meta/mast/README.md b/.meta/mast/README.md index e6f64d739..235f48fcf 100644 --- a/.meta/mast/README.md +++ b/.meta/mast/README.md @@ -21,7 +21,7 @@ The `env_setup.sh` script will automatically: chmod +x .meta/mast/env_setup.sh # Run the setup -./.meta/mast/env_setup.sh +source .meta/mast/env_setup.sh ``` @@ -44,3 +44,82 @@ The launch script will automatically: - Launch the MAST job with the specified config You can run it from anywhere, and it will figure out the correct paths. + + +## How MAST Launcher Works + +The MAST launcher uses a two-stage architecture to run training jobs: + +### Stage 1: Detached Mode (Local Machine) + +When you run `./.meta/mast/launch.sh`, the `main.py` script starts in **detached mode**: + +1. The launcher creates a MAST job with all the worker roles (GPU hosts) +2. It also creates a special **client role** - a CPU-only role that will run inside MAST +3. The client role's entrypoint is set to `client_bootstrap.sh` +4. All CLI arguments you pass are forwarded to the client role + +At this point, the job is submitted to MAST and your local script exits. Everything now runs in the cluster. + +### Stage 2: Remote Mode (Inside MAST) + +The `client_bootstrap.sh` script runs inside the MAST client role and: + +1. Calls `main.py` again, but now with `--mode=remote` +2. In **remote mode**, the script: + - Mounts the OilFS workspace + - Initializes the provisioner to connect to worker roles + - Runs the actual training workload (e.g., GRPO) + +This architecture allows the entire training workflow to run inside MAST without requiring a persistent connection from your local machine. + +### Key Files + +- **`main.py`**: Entry point that handles both detached and remote modes +- **`client_bootstrap.sh`**: Entrypoint for the client role in MAST +- **`launcher.py`**: Creates the MAST job specification and handles role configuration + + +## Managing HuggingFace Models in MAST + +### The Problem: No Internet Access + +MAST compute nodes cannot access the internet, which means they cannot download models directly from HuggingFace. To work around this, we store all HuggingFace models and cache data on OilFS at `/mnt/wsfuse/teamforge/hf`, which is accessible from MAST. + +### Solution: Two-Step Process + +You need to perform both steps below to ensure models work correctly in MAST: + +#### 1. Download Model Weights to OilFS + +First, download the model weights directly to the OilFS path. This should be done from a machine with internet access (like your devserver): + +```bash +# Set HF_HOME to the OilFS path +export HF_HOME=/mnt/wsfuse/teamforge/hf + +# Download the model (replace with your desired model) +huggingface-cli download Qwen/Qwen3-8B --local-dir /mnt/wsfuse/teamforge/hf_artifacts/qwen3_8b +``` + +#### 2. Hydrate the HuggingFace Cache + +After downloading the weights, you need to hydrate the HuggingFace cache so that the transformers library can find the model metadata: + +```bash +# Set HF_HOME to the OilFS path +export HF_HOME=/mnt/wsfuse/teamforge/hf + +# Hydrate the cache for the model +python .meta/mast/hydrate_cache.py --model-id Qwen/Qwen3-8B +``` + +This ensures that when MAST runs with `HF_HUB_OFFLINE=1`, the transformers library can locate all necessary files from the cache. + +### Directory Structure + +Both cache and model files are stored under: +- **Cache**: `/mnt/wsfuse/teamforge/hf` (set via `HF_HOME`) +- **Model weights**: `/mnt/wsfuse/teamforge/hf/` + +Make sure your MAST config files point to the correct paths in `hf_artifacts`. diff --git a/.meta/mast/client_bootstrap.sh b/.meta/mast/client_bootstrap.sh new file mode 100755 index 000000000..1e73a7e3d --- /dev/null +++ b/.meta/mast/client_bootstrap.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# Bootstrap script for the MAST client role +# This script sets up the environment and launches the client training script + +set -eEx + +LIBCUDA="/usr/local/fbcode/platform010/lib/libcuda.so" +if [ -f "$LIBCUDA" ]; then + export LIBCUDA_DIR="${LIBCUDA%/*}" + export TRITON_LIBCUDA_PATH="$LIBCUDA_DIR" + export LD_PRELOAD="$LIBCUDA:/usr/local/fbcode/platform010/lib/libnvidia-ml.so${PRELOAD_PATH:+:$PRELOAD_PATH}" +fi + +# Also preload put path to torch libs as for monarch dev workflow we dont +# install it into the env so we need to make sure the binaries can find +# libtorch and friends on mast and the rpaths set during dev install will +# be wrong on mast. +export LD_LIBRARY_PATH="${CONDA_DIR}/lib:${CONDA_DIR}/lib/python3.10/site-packages/torch/lib${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}" +export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$TORCHX_RUN_PYTHONPATH" + +# shellcheck disable=SC1091 +if [ -n "$CONDA_PREFIX" ]; then + echo "A conda environment is already activated: $CONDA_DEFAULT_ENV" +else + # Disable command printing to avoid log spew. + set +x + source "${CONDA_DIR}/bin/activate" + # Re-enable command printing after conda activation. + set -x +fi + +if [ -z "$WORKSPACE_DIR" ] || [ ! -d "$WORKSPACE_DIR" ]; then + WORKSPACE_DIR="$CONDA_PREFIX" +fi + +cd "$WORKSPACE_DIR/forge" + +export WANDB_MODE=offline +export HF_HUB_OFFLINE=1 +export MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE=1 +export TORCHSTORE_RDMA_ENABLED=1 +export HF_HOME=/mnt/wsfuse/teamforge/hf + +# Execute the client training script with all passed arguments +exec python -X faulthandler .meta/mast/main.py "$@" diff --git a/.meta/mast/env_setup.sh b/.meta/mast/env_setup.sh index feef663b7..c77675299 100755 --- a/.meta/mast/env_setup.sh +++ b/.meta/mast/env_setup.sh @@ -7,10 +7,9 @@ # LICENSE file in the root directory of this source tree. # setup_forge_env.sh - Setup conda environment and install forge with mounting -set -e # Exit on any error # Configuration -CONDA_ENV_NAME="forge:stable" +CONDA_ENV_NAME="forge:41468b33a03eaf2bf5b44517f418028a" # Colors for output RED='\033[0;31m' @@ -109,8 +108,6 @@ fi # Define paths FBSOURCE_PATH="/data/users/$USER/fbsource" CONDA_SCRIPT_PATH="$FBSOURCE_PATH/genai/xlformers/dev/xl_conda.sh" -FORGE_BASE_DIR="/data/users/$USER" -FORGE_REPO_DIR="$FORGE_BASE_DIR/forge" # Workspace URL for mounting WORKSPACE_URL="ws://ws.ai.pci0ai/genai_fair_llm" @@ -143,63 +140,12 @@ fi log_info "Conda environment activated successfully" -# Step 3: Create and navigate to forge base directory -log_info "Step 3: Setting up forge directory..." -if [ ! -d "$FORGE_BASE_DIR" ]; then - log_info "Creating forge base directory: $FORGE_BASE_DIR" - mkdir -p "$FORGE_BASE_DIR" -fi - -cd "$FORGE_BASE_DIR" -log_info "Changed to directory: $(pwd)" - -# Step 4: Clone or update forge repository -log_info "Step 4: Setting up forge git repository..." -if [ -d "$FORGE_REPO_DIR" ]; then - log_warn "Forge repository already exists at: $FORGE_REPO_DIR" - cd "$FORGE_REPO_DIR" - - if [ -d ".git" ]; then - log_info "Updating existing repository..." - git fetch origin - if [ $? -eq 0 ]; then - log_info "Repository updated successfully" - else - log_warn "Failed to fetch updates, continuing with existing code" - fi - else - log_error "Directory exists but is not a git repository" - log_info "Removing directory and cloning fresh..." - cd "$FORGE_BASE_DIR" - rm -rf "$FORGE_REPO_DIR" - git clone git@github.com:meta-pytorch/forge.git - if [ $? -ne 0 ]; then - log_error "Failed to clone forge repository" - exit 1 - fi - cd "$FORGE_REPO_DIR" - fi -else - log_info "Cloning forge repository..." - git clone git@github.com:meta-pytorch/forge.git - if [ $? -ne 0 ]; then - log_error "Failed to clone forge repository" - log_error "Please ensure:" - log_error "1. You have SSH access to github.com" - log_error "2. Your SSH key is added to GitHub" - log_error "3. You have access to meta-pytorch/forge repository" - exit 1 - fi - cd "$FORGE_REPO_DIR" -fi - -log_info "Current directory: $(pwd)" -# Step 5: Install torchtitan -log_info "Step 5: Installing torchtitan..." +# Step 3: Install torchtitan +log_info "Step 3: Installing torchtitan..." # Source versions.sh to get the pinned commit -VERSIONS_FILE="$FORGE_REPO_DIR/assets/versions.sh" +VERSIONS_FILE="assets/versions.sh" if [ -f "$VERSIONS_FILE" ]; then log_info "Sourcing version information from: $VERSIONS_FILE" source "$VERSIONS_FILE" @@ -225,8 +171,8 @@ else exit 1 fi -# Step 5.5: Apply monarch torch import hack -log_info "Step 5.5: Applying monarch torch import hack..." +# Step 3.5: Apply monarch torch import hack +log_info "Step 3.5: Applying monarch torch import hack..." MONARCH_INIT="$CONDA_PREFIX/lib/python3.10/site-packages/monarch/__init__.py" if [ -f "$MONARCH_INIT" ]; then @@ -259,8 +205,8 @@ else log_warn "Skipping monarch torch import hack (monarch may not be installed yet)" fi -# Step 6: Install forge package -log_info "Step 6: Installing forge package..." +# Step 4: Install forge package +log_info "Step 4: Installing forge package..." pip install --no-deps --force-reinstall . if [ $? -ne 0 ]; then log_error "Failed to install forge package" @@ -298,7 +244,11 @@ pip list | grep -E "(forge|monarch)" || log_warn "No forge/monarch packages foun log_info "Environment setup complete! You can now run your scripts." log_info "Mounted workspace available at: /mnt/wsfuse" -# Step 6: Ask user to deactivate and activate conda env conda environment +log_info "Unsetting CUDA_HOME and overwriting the LD_LIBRARY_PATH" +unset CUDA_HOME +export LD_LIBRARY_PATH=${CONDA_PREFIX}/lib + +# Step 5: Ask user to test echo "" log_info "Installation completed successfully!" echo "" diff --git a/.meta/mast/hydrate_cache.py b/.meta/mast/hydrate_cache.py new file mode 100644 index 000000000..6289aee8a --- /dev/null +++ b/.meta/mast/hydrate_cache.py @@ -0,0 +1,56 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +"""This is convenience script meant for hydrating the HuggingFace cache. + +This is meant for downloading the model weights and tokenizer to the cache, i.e. for +OilFS. + +Example: + +python .meta/mast/hydrate_cache.py --model-id Qwen/Qwen3-32B + +""" +import argparse +import os +import sys + +from transformers import AutoModelForCausalLM, AutoTokenizer + + +def main(): + parser = argparse.ArgumentParser( + description="Hydrate HuggingFace cache for a specific model" + ) + parser.add_argument( + "--model-id", + type=str, + required=True, + help="HuggingFace model ID (e.g., Qwen/Qwen3-8B)", + ) + args = parser.parse_args() + + # Ensure HF_HOME is set + hf_home = os.environ.get("HF_HOME") + if not hf_home: + print( + "ERROR: HF_HOME environment variable must be set. " + "You will likely want to run export HF_HOME=/mnt/wsfuse/teamforge/hf." + ) + sys.exit(1) + + print(f"Using HF_HOME: {hf_home}") + print(f"Downloading {args.model_id}...") + + # This will pull tokenizer + config + all weight shards + tokenizer = AutoTokenizer.from_pretrained(args.model_id, trust_remote_code=True) + model = AutoModelForCausalLM.from_pretrained(args.model_id, trust_remote_code=True) + + print("Download complete. Cache hydrated.") + + +if __name__ == "__main__": + main() diff --git a/.meta/mast/launch.sh b/.meta/mast/launch.sh index 46da56d12..2ece4e58e 100755 --- a/.meta/mast/launch.sh +++ b/.meta/mast/launch.sh @@ -34,6 +34,12 @@ fi CONFIG_FILE="$1" +# Generate a unique job name +USER=$(whoami) +RANDOM_SUFFIX=$(cat /dev/urandom | tr -dc 'a-z0-9' | fold -w 6 | head -n 1) +JOB_NAME="${USER}-forge-${RANDOM_SUFFIX}" +log_info "Generated job name: $JOB_NAME" + # Get the directory where this script is located SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" @@ -64,5 +70,10 @@ fi log_info "Successfully reinstalled forge package" # Launch the job +CHECKPOINT_FOLDER=/mnt/wsfuse/teamforge/forge_runs/$JOB_NAME log_info "Launching MAST job..." -PYTHONPATH=. python .meta/mast/main.py --config "$CONFIG_FILE" + +# Manually override the relevant checkpoint path(s) +# This unfortunately cannot be done in the YAML itself since this should be +# based on job name... +PYTHONPATH=. python .meta/mast/main.py --job-name $JOB_NAME --config $CONFIG_FILE trainer.checkpoint.folder=${CHECKPOINT_FOLDER} trainer.dcp_path=${CHECKPOINT_FOLDER} diff --git a/.meta/mast/main.py b/.meta/mast/main.py index cd5de0be9..f5b81e25d 100644 --- a/.meta/mast/main.py +++ b/.meta/mast/main.py @@ -4,13 +4,18 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import argparse import asyncio -import getpass -import uuid +import sys from apps.grpo.main import main as grpo_main from forge.cli.config import parse -from forge.controller.launcher import JOB_NAME_KEY, LAUNCHER_KEY +from forge.controller.launcher import ( + JOB_NAME_KEY, + LAUNCHER_KEY, + MastLauncher, + mount_mnt_directory, +) from forge.controller.provisioner import init_provisioner from forge.types import ( @@ -26,42 +31,73 @@ DEFAULT_CHECKPOINT_FOLDER = "/mnt/wsfuse/teamforge/forge_runs/" -async def main(cfg: DictConfig): - """Main module for launching mast jobs for GRPO training.""" +async def main(cfg: DictConfig, mode: str = "detached", extra_args: list = None): + """Main module for launching mast jobs for GRPO training. + + Args: + cfg: Configuration dictionary + mode: "detached" (default) launches MAST job with client in MAST, + "remote" runs training directly (used when client runs in MAST) + extra_args: Additional CLI arguments to pass through to the client + """ if cfg.get(LAUNCHER_KEY, Launcher.MAST.value) != Launcher.MAST.value: raise ValueError("Launcher must be MAST.") - if cfg.get(JOB_NAME_KEY, None) is not None: - # prepend user name to the job to avoid name collision - cfg[JOB_NAME_KEY] = f"{getpass.getuser()}-{cfg[JOB_NAME_KEY]}" - print(f"Overriding mast job name to {cfg[JOB_NAME_KEY]}") + # Job name should already be set from CLI args in __main__ section + # No need to modify it further here + if cfg.get(JOB_NAME_KEY, None) is None: + raise ValueError("Job name is required but not provided") - if cfg.get(DEFAULT_CHECKPOINT_FOLDER_KEY, DEFAULT_CHECKPOINT_FOLDER) is not None: - # append job_name and guid to CP folder path to avoid path collision - if cfg[DEFAULT_CHECKPOINT_FOLDER_KEY] == DEFAULT_CHECKPOINT_FOLDER: - cfg[ - DEFAULT_CHECKPOINT_FOLDER_KEY - ] = f"{cfg[DEFAULT_CHECKPOINT_FOLDER_KEY]}{cfg[JOB_NAME_KEY]}-{uuid.uuid4().hex[:6]}" - print(f"Overriding checkpoint folder to {cfg[DEFAULT_CHECKPOINT_FOLDER_KEY]}") + launcher_config = LauncherConfig( + launcher=Launcher(cfg.get(LAUNCHER_KEY, Launcher.MAST.value)), + job_name=cfg.get(JOB_NAME_KEY, None), + services={k: ServiceConfig(**v) for k, v in cfg.services.items()}, + actors={k: ProcessConfig(**v) for k, v in cfg.actors.items()}, + ) - # init mast provisioner - await init_provisioner( - ProvisionerConfig( - launcher_config=LauncherConfig( - launcher=Launcher(cfg.get(LAUNCHER_KEY, Launcher.MAST.value)), - job_name=cfg.get(JOB_NAME_KEY, None), - services={k: ServiceConfig(**v) for k, v in cfg.services.items()}, - actors={k: ProcessConfig(**v) for k, v in cfg.actors.items()}, - ) + if mode == "detached": + # In detached mode, just launch the MAST job with client role included + launcher = MastLauncher( + launcher_config, + detached=True, + extra_args=extra_args or [], ) - ) - await grpo_main(cfg) + await launcher.launch_mast_job() + print(f"MAST job {launcher.job_name} launched successfully with client role.") + print("The client is running inside MAST and will execute the training.") + else: + # In remote mode, we're already running inside MAST, so mount directory, init provisioner and run training + mount_mnt_directory("/mnt/wsfuse") + await init_provisioner(ProvisionerConfig(launcher_config=launcher_config)) + await grpo_main(cfg) if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--mode", + type=str, + default="detached", + choices=["detached", "remote"], + help="Run mode: 'detached' for launching MAST job with client in MAST, 'remote' for running training directly", + ) + parser.add_argument( + "--job-name", + type=str, + default=None, + help="MAST job name (required - generated by launch.sh)", + ) + args, remaining = parser.parse_known_args() + + # Replace sys.argv with remaining args so @parse can work + sys.argv = [sys.argv[0]] + remaining @parse def _main(cfg): - asyncio.run(main(cfg)) + # Override job name from CLI + if args.job_name: + cfg[JOB_NAME_KEY] = args.job_name + print(f"Using job name: {args.job_name}") + asyncio.run(main(cfg, mode=args.mode, extra_args=remaining)) _main() # @parse grabs the cfg from CLI diff --git a/.meta/mast/qwen3_14b_mast.yaml b/.meta/mast/qwen3_14b_mast.yaml index f1f05825f..786f0103c 100644 --- a/.meta/mast/qwen3_14b_mast.yaml +++ b/.meta/mast/qwen3_14b_mast.yaml @@ -3,14 +3,13 @@ # Global configuration group_size: 8 -batch_size: 16 +local_batch_size: 16 # per-device batch size max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-14B" off_by_n: 1 # Off by one by default launcher: mast job_name: forge-qwen3-14b -checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas @@ -26,7 +25,7 @@ metric_logging: # Dataset configuration dataset: - path: "openai/gsm8k" + path: /mnt/wsfuse/teamforge/hf/gsm8k revision: "main" data_split: "train" streaming: true @@ -35,7 +34,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + model: /mnt/wsfuse/teamforge/hf/qwen3_14b tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: false @@ -53,7 +52,7 @@ trainer: model: name: qwen3 flavor: 14B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_14b optimizer: name: AdamW lr: 1e-5 @@ -61,7 +60,7 @@ trainer: lr_scheduler: warmup_steps: 1 training: - local_batch_size: ${batch_size} + local_batch_size: ${local_batch_size} seq_len: 2048 max_norm: 1.0 steps: 1000000 @@ -79,8 +78,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_14b initial_load_in_hf: true + folder: ${checkpoint_folder} last_save_in_hf: true interval: 500 async_mode: "disabled" @@ -95,7 +95,7 @@ trainer: # Replay buffer configuration replay_buffer: - batch_size: ${batch_size} + batch_size: ${local_batch_size} max_policy_age: ${off_by_n} dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree @@ -104,7 +104,7 @@ ref_model: model: name: qwen3 flavor: 14B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_14b training: dtype: bfloat16 gc_freq: 1 @@ -119,7 +119,8 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_14b + folder: "" initial_load_in_hf: true comm: # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP diff --git a/.meta/mast/qwen3_1_7b_mast.yaml b/.meta/mast/qwen3_1_7b_mast.yaml index 39aaf01ba..4065cf07a 100644 --- a/.meta/mast/qwen3_1_7b_mast.yaml +++ b/.meta/mast/qwen3_1_7b_mast.yaml @@ -3,14 +3,13 @@ # Global configuration group_size: 8 -batch_size: 16 +local_batch_size: 16 # per-device batch size max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-1.7B" off_by_n: 1 # Off by one by default launcher: mast job_name: forge-qwen3-1_7b -checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas @@ -26,7 +25,7 @@ metric_logging: # Dataset configuration dataset: - path: "openai/gsm8k" + path: /mnt/wsfuse/teamforge/hf/gsm8k revision: "main" data_split: "train" streaming: true @@ -35,7 +34,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + model: /mnt/wsfuse/teamforge/hf/qwen3_1.7b tensor_parallel_size: 1 pipeline_parallel_size: 1 enforce_eager: false @@ -53,7 +52,8 @@ trainer: model: name: qwen3 flavor: 1.7B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_1.7b + # hf_assets_path: hf://${model} optimizer: name: AdamW lr: 1e-5 @@ -61,7 +61,7 @@ trainer: lr_scheduler: warmup_steps: 1 training: - local_batch_size: ${batch_size} + local_batch_size: ${local_batch_size} seq_len: 2048 max_norm: 1.0 steps: 1000000 @@ -79,8 +79,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_1.7b initial_load_in_hf: true + folder: ${checkpoint_folder} last_save_in_hf: true interval: 500 async_mode: "disabled" @@ -95,7 +96,7 @@ trainer: # Replay buffer configuration replay_buffer: - batch_size: ${batch_size} + batch_size: ${local_batch_size} max_policy_age: ${off_by_n} dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree @@ -104,7 +105,8 @@ ref_model: model: name: qwen3 flavor: 1.7B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_1.7b + # hf_assets_path: hf://${model} training: dtype: bfloat16 gc_freq: 1 @@ -119,20 +121,21 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_1.7b + folder: "" initial_load_in_hf: true # All resource allocations services: policy: procs: ${policy.engine_args.tensor_parallel_size} - num_replicas: 2 + num_replicas: 1 with_gpus: true mesh_name: policy hosts: 1 ref_model: procs: 1 - num_replicas: 2 + num_replicas: 1 with_gpus: true mesh_name: ref_model hosts: 1 diff --git a/.meta/mast/qwen3_32b_mast.yaml b/.meta/mast/qwen3_32b_mast.yaml index 2dc25509d..713c1f784 100644 --- a/.meta/mast/qwen3_32b_mast.yaml +++ b/.meta/mast/qwen3_32b_mast.yaml @@ -3,14 +3,13 @@ # Global configuration group_size: 8 -batch_size: 16 +local_batch_size: 16 # per-device batch size max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-32B" off_by_n: 1 # Off by one by default launcher: mast job_name: forge-qwen3-32b -checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas @@ -26,7 +25,7 @@ metric_logging: # Dataset configuration dataset: - path: "openai/gsm8k" + path: /mnt/wsfuse/teamforge/hf/gsm8k revision: "main" data_split: "train" streaming: true @@ -35,7 +34,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + model: /mnt/wsfuse/teamforge/hf/qwen3_32b tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: false @@ -53,7 +52,7 @@ trainer: model: name: qwen3 flavor: 32B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_32b optimizer: name: AdamW lr: 1e-5 @@ -61,7 +60,7 @@ trainer: lr_scheduler: warmup_steps: 1 training: - local_batch_size: ${batch_size} + local_batch_size: ${local_batch_size} seq_len: 2048 max_norm: 1.0 steps: 1000000 @@ -79,8 +78,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_32b initial_load_in_hf: true + folder: ${checkpoint_folder} last_save_in_hf: true interval: 500 async_mode: "disabled" @@ -95,7 +95,7 @@ trainer: # Replay buffer configuration replay_buffer: - batch_size: ${batch_size} + batch_size: ${local_batch_size} max_policy_age: ${off_by_n} dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree @@ -104,7 +104,7 @@ ref_model: model: name: qwen3 flavor: 32B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_32b training: dtype: bfloat16 gc_freq: 1 @@ -119,7 +119,8 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_32b + folder: "" initial_load_in_hf: true comm: # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP diff --git a/.meta/mast/qwen3_4b_mast.yaml b/.meta/mast/qwen3_4b_mast.yaml index 5e74f4b2a..e11e2a25a 100644 --- a/.meta/mast/qwen3_4b_mast.yaml +++ b/.meta/mast/qwen3_4b_mast.yaml @@ -3,14 +3,13 @@ # Global configuration group_size: 8 -batch_size: 16 +local_batch_size: 16 # per-device batch size max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-4B" off_by_n: 1 # Off by one by default launcher: mast job_name: forge-qwen3-4b -checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas @@ -26,7 +25,7 @@ metric_logging: # Dataset configuration dataset: - path: "openai/gsm8k" + path: /mnt/wsfuse/teamforge/hf/gsm8k revision: "main" data_split: "train" streaming: true @@ -35,7 +34,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + model: /mnt/wsfuse/teamforge/hf/qwen3_4b tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: false @@ -53,7 +52,8 @@ trainer: model: name: qwen3 flavor: 4B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_4b + # hf_assets_path: hf://${model} optimizer: name: AdamW lr: 1e-5 @@ -61,7 +61,7 @@ trainer: lr_scheduler: warmup_steps: 1 training: - local_batch_size: ${batch_size} + local_batch_size: ${local_batch_size} seq_len: 2048 max_norm: 1.0 steps: 1000000 @@ -79,8 +79,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_4b initial_load_in_hf: true + folder: ${checkpoint_folder} last_save_in_hf: true interval: 500 async_mode: "disabled" @@ -95,7 +96,7 @@ trainer: # Replay buffer configuration replay_buffer: - batch_size: ${batch_size} + batch_size: ${local_batch_size} max_policy_age: ${off_by_n} dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree @@ -104,7 +105,8 @@ ref_model: model: name: qwen3 flavor: 4B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_4b + # hf_assets_path: hf://${model} training: dtype: bfloat16 gc_freq: 1 @@ -119,7 +121,8 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_4b + folder: "" initial_load_in_hf: true # All resource allocations @@ -144,7 +147,7 @@ services: actors: dataset: - procs: 8 + procs: 1 with_gpus: false mesh_name: dataset trainer: diff --git a/.meta/mast/qwen3_8b_mast.yaml b/.meta/mast/qwen3_8b_mast.yaml index 7f5b49af6..0405d767f 100644 --- a/.meta/mast/qwen3_8b_mast.yaml +++ b/.meta/mast/qwen3_8b_mast.yaml @@ -3,14 +3,13 @@ # Global configuration group_size: 8 -batch_size: 16 +local_batch_size: 16 # per-device batch size max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-8B" off_by_n: 1 # Off by one by default launcher: mast job_name: forge-qwen3-8b -checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas @@ -26,7 +25,7 @@ metric_logging: # Dataset configuration dataset: - path: "openai/gsm8k" + path: /mnt/wsfuse/teamforge/hf/gsm8k revision: "main" data_split: "train" streaming: true @@ -35,7 +34,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + model: /mnt/wsfuse/teamforge/hf/qwen3_8b tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: false @@ -53,7 +52,7 @@ trainer: model: name: qwen3 flavor: 8B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_8b optimizer: name: AdamW lr: 1e-5 @@ -61,7 +60,7 @@ trainer: lr_scheduler: warmup_steps: 1 training: - local_batch_size: ${batch_size} + local_batch_size: ${local_batch_size} seq_len: 2048 max_norm: 1.0 steps: 1000000 @@ -79,8 +78,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_8b initial_load_in_hf: true + folder: ${checkpoint_folder} last_save_in_hf: true interval: 500 async_mode: "disabled" @@ -95,7 +95,7 @@ trainer: # Replay buffer configuration replay_buffer: - batch_size: ${batch_size} + batch_size: ${local_batch_size} max_policy_age: ${off_by_n} dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree @@ -104,7 +104,7 @@ ref_model: model: name: qwen3 flavor: 8B - hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + hf_assets_path: /mnt/wsfuse/teamforge/hf/qwen3_8b training: dtype: bfloat16 gc_freq: 1 @@ -119,7 +119,8 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + initial_load_path: /mnt/wsfuse/teamforge/hf/qwen3_8b + folder: "" initial_load_in_hf: true # All resource allocations diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index 534e5b92a..4a8858269 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -29,7 +29,7 @@ dataset: # Policy configuration policy: engine_args: # https://docs.vllm.ai/en/v0.10.0/api/vllm/engine/arg_utils.html#vllm.engine.arg_utils.EngineArgs - model: ${model} + model: /mnt/wsfuse/teamforge/hf_artifacts/qwen3_8b tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: false diff --git a/assets/versions.sh b/assets/versions.sh index 7c188b0d5..49a755dc0 100644 --- a/assets/versions.sh +++ b/assets/versions.sh @@ -15,5 +15,5 @@ VLLM_BRANCH="v0.10.0" # Commit hashes MONARCH_COMMIT="195503223b5c2896846171f60ac99dc6868f8f2c" -TORCHTITAN_COMMIT="0cfbd0b3c2d827af629a107a77a9e47229c31663" +TORCHTITAN_COMMIT="d0e25450bcac2332359b13fbda430dc701f073d4" TORCHSTORE_COMMIT="662299faf4fd50ee30bd9aa3f4ce8c0e2db1d310" diff --git a/src/forge/controller/launcher.py b/src/forge/controller/launcher.py index 65ece6597..333acbe32 100644 --- a/src/forge/controller/launcher.py +++ b/src/forge/controller/launcher.py @@ -4,10 +4,12 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +"""Launcher specific logic (i.e. SLURM, k8s when supported, etc.)""" + +import copy import getpass import os import subprocess - import tempfile import uuid from typing import Any @@ -46,6 +48,58 @@ LAUNCHER_KEY = "launcher" +def mount_mnt_directory(mount_dst: str) -> None: + """Mounts the MAST remote directory to the specified destination. + + This function mounts a remote workspace directory that contains huggingface models + and other shared resources needed for training. + + Args: + mount_dst: Destination path where the directory should be mounted (e.g., "/mnt/wsfuse") + """ + # Sanity check of the mounted directory + sanity_path = os.path.join(mount_dst, "huggingface_models/") + if os.path.exists(sanity_path): + return + + # Otherwise, mount the directory + if not os.path.exists(mount_dst): + os.makedirs(mount_dst, exist_ok=True) + + # Store original LD_LIBRARY_PATH to restore after mounting + original_ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") + + try: + clean_env = os.environ.copy() + if "LD_LIBRARY_PATH" in clean_env: + del clean_env["LD_LIBRARY_PATH"] + + subprocess.run( + [ + "/packages/oil.oilfs/oilfs-wrapper", + "ws://ws.ai.pci0ai/genai_fair_llm", + mount_dst, + ], + capture_output=True, + text=True, + check=True, + env=clean_env, + ) + print("Done mounting") + except subprocess.CalledProcessError as e: + print(f"Get error during mounting {e}, Stderr: {e.stderr}, Stdout: {e.stdout}") + finally: + # Restore original LD_LIBRARY_PATH + if original_ld_library_path: + os.environ["LD_LIBRARY_PATH"] = original_ld_library_path + elif "LD_LIBRARY_PATH" in os.environ: + del os.environ["LD_LIBRARY_PATH"] + + assert os.path.exists( + sanity_path + ), f"Did not find directory {sanity_path}; something wrong with mounting." + + class MastSetupActor(Actor): @endpoint def mount(self, mount_dst: str): @@ -56,53 +110,7 @@ def mount(self, mount_dst: str): if current_rank().rank % proc_count != 0: # Only use one rank per host to mount the directory return - self.mount_mnt_directory(mount_dst) - - def mount_mnt_directory(self, mount_dst: str) -> None: - # Sanity check of the mounted directory - sanity_path = os.path.join(mount_dst, "huggingface_models/") - if os.path.exists(sanity_path): - print(f"Found directory {sanity_path}; skip mounting.") - return - - # Otherwise, mount the directory - if not os.path.exists(mount_dst): - os.makedirs(mount_dst, exist_ok=True) - - # Store original LD_LIBRARY_PATH to restore after mounting - original_ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") - - try: - clean_env = os.environ.copy() - if "LD_LIBRARY_PATH" in clean_env: - del clean_env["LD_LIBRARY_PATH"] - - subprocess.run( - [ - "/packages/oil.oilfs/oilfs-wrapper", - "ws://ws.ai.pci0ai/genai_fair_llm", - mount_dst, - ], - capture_output=True, - text=True, - check=True, - env=clean_env, - ) - print("Done mounting") - except subprocess.CalledProcessError as e: - print( - f"Get error during mounting {e}, Stderr: {e.stderr}, Stdout: {e.stdout}" - ) - finally: - # Restore original LD_LIBRARY_PATH - if original_ld_library_path: - os.environ["LD_LIBRARY_PATH"] = original_ld_library_path - elif "LD_LIBRARY_PATH" in os.environ: - del os.environ["LD_LIBRARY_PATH"] - - assert os.path.exists( - sanity_path - ), f"Did not find directory {sanity_path}; something wrong with mounting." + mount_mnt_directory(mount_dst) class BaseLauncher: @@ -157,18 +165,49 @@ async def remote_setup(self, procs: ProcMesh) -> None: return -class Mastlauncher(BaseLauncher): - def __init__(self, cfg: LauncherConfig | None = None): +class MastLauncher(BaseLauncher): + """Launcher for MAST (Meta's internal cluster scheduler). + + This launcher supports two modes of operation: + + 1. Non-detached mode (detached=False): + - Client runs on your local machine/devserver + - Only worker roles (GPU hosts) are launched in MAST + - Client connects to workers remotely via provisioner + + 2. Detached mode (detached=True): + - Client runs entirely inside MAST as a separate role + - Both client role (CPU-only) and worker roles (GPU) are launched in MAST + - Client role executes the training script with --mode=remote + - Everything runs in the cluster, no client needed on local machine + + Args: + cfg: Launcher configuration including job name, services, and actors + detached: If True, adds a client role to the MAST job appdef that runs + the training script inside MAST. If False, only launches worker + roles and expects the client to run on local machine. + extra_args: Additional CLI arguments to pass through to the client role. + + """ + + def __init__( + self, + cfg: LauncherConfig | None = None, + detached: bool = False, + extra_args: list = None, + ): assert cfg is not None self.cfg = cfg + self.detached = detached self.default_monarch_port = 26600 + self.extra_args = extra_args or [] self.scheduler_name = "mast_conda" - # TODO: enabe taking this from config + # TODO: enable taking this from config self.sku = "gtt_any" self.timeout_sec = 1 * 60 * 60 # Kill the job if idle for 1 hour self.user = getpass.getuser() - self.work_dir = f"/data/users/{self.user}" + self.work_dir = f"/home/{self.user}" self.edittable_workspaces = ["forge"] self.remote_work_dir = "/packages/monarch_default_workspace/workspace/" self.editable_workspace_paths = [ @@ -182,8 +221,6 @@ async def initialize(self) -> None: # This can be removed in the future once this has been removed. configure(default_transport=ChannelTransport.MetaTlsWithHostname) - await self.launch_mast_job() - async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: allocator = MastAllocator( MastAllocatorConfig( @@ -255,11 +292,15 @@ def build_appdef(self) -> specs.AppDef: "TORCHDYNAMO_VERBOSE": "1", "VLLM_TORCH_COMPILE_LEVEL": "0", "VLLM_USE_TRITON_FLASH_ATTN": "0", + "WANDB_MODE": "offline", + "HF_HUB_OFFLINE": "1", + "MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE": "1", + "TORCHSTORE_RDMA_ENABLED": "1", + "HF_HOME": "/mnt/wsfuse/teamforge/hf", + "TRANSFORMERS_OFFLINE": "1", }, } - print("DEFAULT ENVS: ", default_envs) - packages = Packages() meshes = [] # Process both services and actors configurations @@ -289,6 +330,15 @@ def build_appdef(self) -> specs.AppDef: timeout_sec=self.timeout_sec, env=default_envs, ) + appdef.metadata["mast"] = { + "HpcJobDefinition": { + "networkAffinity": { + # Ensure colocation + "preferredScope": 3, # DC + "fallbackScope": 3, # REGION + }, + }, + } for role in appdef.roles: role.resource.capabilities["server_sub_types"] = [ @@ -296,8 +346,45 @@ def build_appdef(self) -> specs.AppDef: role.resource.capabilities["server_sub_types"][1] # GTT ] + # Add client role to run in MAST if in detached mode + if self.detached: + client_role = self._create_client_role(appdef) + appdef.roles.insert(0, client_role) + return appdef + def _create_client_role(self, appdef: specs.AppDef) -> specs.Role: + # Clone an existing worker role to inherit workspace configuration + if not appdef.roles: + raise ValueError( + "Cannot create client role: no worker roles exist to clone from" + ) + + # Clone the first worker role + client_role = copy.deepcopy(appdef.roles[0]) + + # Override with client-specific configuration + client_role.name = "client" + # Use the bootstrap script as entrypoint + client_role.entrypoint = "workspace/forge/.meta/mast/client_bootstrap.sh" + + # Build args for the client role (passed to the bootstrap script) + # These args will be passed to client_bootstrap.sh which forwards them to main.py + args = [ + "--mode=remote", + "--job-name", + self.job_name, + ] + + # Add any extra args passed from the CLI (includes --config and other args) + if self.extra_args: + args.extend(self.extra_args) + + client_role.args = args + client_role.num_replicas = 1 + + return client_role + def create_job_name(self): return f"{self.user}-forge-{uuid.uuid4().hex[:6]}" @@ -315,6 +402,6 @@ def get_launcher(cfg: LauncherConfig | None = None) -> BaseLauncher | None: raise ValueError( "MAST imports did not succeed, cannot launch MAST jobs. Please verify your installation" ) - return Mastlauncher(cfg) + return MastLauncher(cfg, detached=False) else: raise ValueError(f"Unsupported config provided, got {cfg}")