diff --git a/apps/grpo/__init__.py b/apps/grpo/__init__.py new file mode 100644 index 000000000..2e41cd717 --- /dev/null +++ b/apps/grpo/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 7545aa561..2439100d9 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -7,7 +7,6 @@ # Usage: python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml import asyncio - import time import uuid from dataclasses import dataclass @@ -27,11 +26,20 @@ from forge.actors.trainer import RLTrainer from forge.cli.config import parse from forge.controller.actor import ForgeActor -from forge.controller.provisioner import shutdown +from forge.controller.launcher import JOB_NAME_KEY, LAUNCHER_KEY +from forge.controller.provisioner import init_provisioner, shutdown from forge.data.rewards import MathReward, ThinkingReward from forge.observability.metric_actors import get_or_create_metric_logger from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer + +from forge.types import ( + Launcher, + LauncherConfig, + ProcessConfig, + ProvisionerConfig, + ServiceConfig, +) from forge.util.ops import compute_logprobs from monarch.actor import endpoint from omegaconf import DictConfig @@ -312,6 +320,18 @@ async def main(cfg: DictConfig): max_req_tokens = cfg.max_req_tokens max_res_tokens = cfg.max_res_tokens + # init provisioner + await init_provisioner( + ProvisionerConfig( + launcher_config=LauncherConfig( + launcher=Launcher(cfg.get(LAUNCHER_KEY, Launcher.SLURM.value)), + job_name=cfg.get(JOB_NAME_KEY, None), + services={k: ServiceConfig(**v) for k, v in cfg.services.items()}, + actors={k: ProcessConfig(**v) for k, v in cfg.actors.items()}, + ) + ) + ) + # initialize before spawning services metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) mlogger = await get_or_create_metric_logger() diff --git a/apps/mast/README.md b/apps/mast/README.md new file mode 100644 index 000000000..6cd48d32d --- /dev/null +++ b/apps/mast/README.md @@ -0,0 +1,31 @@ +# Forge MAST Environment Setup + +A simple setup script to automatically configure your environment for running Forge with MAST jobs. + +## Quick Start + +### 1. Run the Setup Script + +The `env_setup.sh` script will automatically: +- ✅ Activate the required conda environment (`forge-8448524`) +- ✅ Clone/update the Forge repository +- ✅ Install Forge package dependencies +- ✅ Mount the required oilfs workspace to `/mnt/wsfuse` +- ✅ Configure your environment for MAST job submission + +```bash +# Make the script executable +chmod +x env_setup.sh + +# Run the setup +./apps/mast/env_setup.sh + +``` + +### 2. Submit MAST job + +``` +pip install --force-reinstall --no-deps . && python -m apps.mast.main --config apps/mast/qwen3_1_7b_mast.yaml +``` + +⚠️ Important Note: `pip install --force-reinstall --no-deps .` is required every time you make a change to the local codebase. This ensures your latest changes are installed before job submission. diff --git a/apps/mast/__init__.py b/apps/mast/__init__.py new file mode 100644 index 000000000..2e41cd717 --- /dev/null +++ b/apps/mast/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. diff --git a/apps/mast/env_setup.sh b/apps/mast/env_setup.sh new file mode 100755 index 000000000..4318e05f0 --- /dev/null +++ b/apps/mast/env_setup.sh @@ -0,0 +1,270 @@ +#!/bin/bash + +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# setup_forge_env.sh - Setup conda environment and install forge with mounting +set -e # Exit on any error + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Logging functions +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to mount a single workspace to /mnt/wsfuse +mount_workspace() { + local workspace_url="$1" + local mount_dir="/mnt/wsfuse" + + if [ -z "$workspace_url" ]; then + log_error "No workspace URL provided for mounting" + return 1 + fi + + log_info "Setting up mount directory: $mount_dir" + + # Create the directory if it doesn't exist + if [ ! -d "$mount_dir" ]; then + log_info "Creating mount directory: $mount_dir" + sudo mkdir -p "$mount_dir" || { + log_error "Failed to create mount directory (may need sudo privileges)" + return 1 + } + fi + + # Check if the directory is already mounted + if mountpoint -q "$mount_dir" 2>/dev/null; then + log_warn "Directory $mount_dir is already mounted, skipping mount" + return 0 + fi + + # Check if oilfs command exists + if ! command -v oilfs >/dev/null 2>&1; then + log_error "oilfs command not found. Please ensure it's installed and in PATH" + return 1 + fi + + log_info "Mounting workspace $workspace_url to $mount_dir" + + # Store original LD_LIBRARY_PATH to restore after mounting (similar to Python code) + original_ld_library_path="${LD_LIBRARY_PATH:-}" + + # Temporarily unset LD_LIBRARY_PATH for mounting + unset LD_LIBRARY_PATH + + # Mount the workspace + if sudo oilfs "$workspace_url" "$mount_dir"; then + log_info "Successfully mounted $workspace_url to $mount_dir" + else + log_error "Failed to mount $workspace_url to $mount_dir" + # Restore original LD_LIBRARY_PATH + if [ -n "$original_ld_library_path" ]; then + export LD_LIBRARY_PATH="$original_ld_library_path" + fi + return 1 + fi + + # Restore original LD_LIBRARY_PATH + if [ -n "$original_ld_library_path" ]; then + export LD_LIBRARY_PATH="$original_ld_library_path" + fi + + # Verify mount was successful + if [ -d "$mount_dir/huggingface_models" ]; then + log_info "Mount verification successful - found expected directory structure" + else + log_warn "Mount verification: Expected directory structure not found, but mount appears successful" + fi + + return 0 +} + +# Function to safely deactivate conda +safe_conda_deactivate() { + if command -v conda >/dev/null 2>&1; then + if conda info --envs >/dev/null 2>&1; then + conda deactivate 2>/dev/null || log_warn "Could not deactivate conda (might not be in an environment)" + else + log_warn "Conda not properly initialized, skipping deactivate" + fi + else + log_warn "Conda command not found, skipping deactivate" + fi +} + +# Function to safely activate conda environment +safe_conda_activate() { + local env_name="$1" + + if command -v conda >/dev/null 2>&1; then + if conda info --envs >/dev/null 2>&1; then + conda activate "$env_name" + else + log_warn "Conda not properly initialized" + log_info "Attempting to use xl_conda.sh activation instead..." + source "$CONDA_SCRIPT_PATH" activate "$env_name" + fi + else + log_warn "Conda command not found" + log_info "Attempting to use xl_conda.sh activation instead..." + source "$CONDA_SCRIPT_PATH" activate "$env_name" + fi +} + +# Check if required environment variables are set +if [ -z "$USER" ]; then + log_error "USER environment variable is not set" + exit 1 +fi + +# Define paths +FBSOURCE_PATH="/data/users/$USER/fbsource" +CONDA_SCRIPT_PATH="$FBSOURCE_PATH/genai/xlformers/dev/xl_conda.sh" +FORGE_BASE_DIR="/data/users/$USER" +FORGE_REPO_DIR="$FORGE_BASE_DIR/forge" + +# Workspace URL for mounting +WORKSPACE_URL="ws://ws.ai.pci0ai/genai_fair_llm" + +log_info "Starting forge environment setup for user: $USER" + +# Step 1: Mount workspace (do this early in case other steps need the mounted files) +log_info "Step 1: Mounting workspace..." +mount_workspace "$WORKSPACE_URL" +if [ $? -ne 0 ]; then + log_warn "Failed to mount workspace, continuing with setup..." + log_warn "Some functionality may not be available without the mounted workspace" +fi + +# Step 2: Check if conda script exists and source it +log_info "Step 2: Activating conda environment..." +if [ ! -f "$CONDA_SCRIPT_PATH" ]; then + log_error "Conda script not found at: $CONDA_SCRIPT_PATH" + log_error "Please ensure fbsource is properly set up" + exit 1 +fi + +log_info "Sourcing conda script: $CONDA_SCRIPT_PATH" +source "$CONDA_SCRIPT_PATH" activate forge:e146614 + +if [ $? -ne 0 ]; then + log_error "Failed to activate conda environment forge-e146614" + exit 1 +fi + +log_info "Conda environment activated successfully" + +# Step 3: Create and navigate to forge base directory +log_info "Step 3: Setting up forge directory..." +if [ ! -d "$FORGE_BASE_DIR" ]; then + log_info "Creating forge base directory: $FORGE_BASE_DIR" + mkdir -p "$FORGE_BASE_DIR" +fi + +cd "$FORGE_BASE_DIR" +log_info "Changed to directory: $(pwd)" + +# Step 4: Clone or update forge repository +log_info "Step 4: Setting up forge git repository..." +if [ -d "$FORGE_REPO_DIR" ]; then + log_warn "Forge repository already exists at: $FORGE_REPO_DIR" + cd "$FORGE_REPO_DIR" + + if [ -d ".git" ]; then + log_info "Updating existing repository..." + git fetch origin + if [ $? -eq 0 ]; then + log_info "Repository updated successfully" + else + log_warn "Failed to fetch updates, continuing with existing code" + fi + else + log_error "Directory exists but is not a git repository" + log_info "Removing directory and cloning fresh..." + cd "$FORGE_BASE_DIR" + rm -rf "$FORGE_REPO_DIR" + git clone git@github.com:meta-pytorch/forge.git + if [ $? -ne 0 ]; then + log_error "Failed to clone forge repository" + exit 1 + fi + cd "$FORGE_REPO_DIR" + fi +else + log_info "Cloning forge repository..." + git clone git@github.com:meta-pytorch/forge.git + if [ $? -ne 0 ]; then + log_error "Failed to clone forge repository" + log_error "Please ensure:" + log_error "1. You have SSH access to github.com" + log_error "2. Your SSH key is added to GitHub" + log_error "3. You have access to meta-pytorch/forge repository" + exit 1 + fi + cd "$FORGE_REPO_DIR" +fi + +log_info "Current directory: $(pwd)" + +# Step 5: Install forge package +log_info "Step 5: Installing forge package..." +pip install --no-deps --force-reinstall . +if [ $? -ne 0 ]; then + log_error "Failed to install forge package" + exit 1 +fi +log_info "Forge package installed successfully" + +log_info "Environment activation completed" + +# Final verification +log_info "Setup completed successfully!" + +# Check mount status +if mountpoint -q "/mnt/wsfuse" 2>/dev/null; then + log_info "Workspace mount: ✓ Active at /mnt/wsfuse" +else + log_warn "Workspace mount: ✗ Not mounted" +fi + +# Check current environment +if command -v conda >/dev/null 2>&1 && conda info --envs >/dev/null 2>&1; then + CURRENT_ENV=$(conda info --show-active-prefix 2>/dev/null | sed 's/.*\///' || echo "unknown") + log_info "Current conda environment: $CURRENT_ENV" +else + log_info "Current environment: Using xl_conda.sh managed environment" +fi + +log_info "Current directory: $(pwd)" +log_info "Python location: $(which python)" + +# Show installed packages +log_info "Key installed packages:" +pip list | grep -E "(forge|monarch)" || log_warn "No forge/monarch packages found in pip list" + +log_info "Environment setup complete! You can now run your scripts." +log_info "Mounted workspace available at: /mnt/wsfuse" + +# Step 6: Ask user to deactivate and activate conda env conda environment +echo "" +log_info "Installation completed successfully!" +echo "" +log_info "Re-activate the conda environment to make the changes take effect:" +log_info "conda deactivate && conda activate forge-e146614" diff --git a/apps/mast/main.py b/apps/mast/main.py new file mode 100644 index 000000000..cd5de0be9 --- /dev/null +++ b/apps/mast/main.py @@ -0,0 +1,67 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import asyncio +import getpass +import uuid + +from apps.grpo.main import main as grpo_main +from forge.cli.config import parse +from forge.controller.launcher import JOB_NAME_KEY, LAUNCHER_KEY +from forge.controller.provisioner import init_provisioner + +from forge.types import ( + Launcher, + LauncherConfig, + ProcessConfig, + ProvisionerConfig, + ServiceConfig, +) +from omegaconf import DictConfig + +DEFAULT_CHECKPOINT_FOLDER_KEY = "checkpoint_folder" +DEFAULT_CHECKPOINT_FOLDER = "/mnt/wsfuse/teamforge/forge_runs/" + + +async def main(cfg: DictConfig): + """Main module for launching mast jobs for GRPO training.""" + if cfg.get(LAUNCHER_KEY, Launcher.MAST.value) != Launcher.MAST.value: + raise ValueError("Launcher must be MAST.") + + if cfg.get(JOB_NAME_KEY, None) is not None: + # prepend user name to the job to avoid name collision + cfg[JOB_NAME_KEY] = f"{getpass.getuser()}-{cfg[JOB_NAME_KEY]}" + print(f"Overriding mast job name to {cfg[JOB_NAME_KEY]}") + + if cfg.get(DEFAULT_CHECKPOINT_FOLDER_KEY, DEFAULT_CHECKPOINT_FOLDER) is not None: + # append job_name and guid to CP folder path to avoid path collision + if cfg[DEFAULT_CHECKPOINT_FOLDER_KEY] == DEFAULT_CHECKPOINT_FOLDER: + cfg[ + DEFAULT_CHECKPOINT_FOLDER_KEY + ] = f"{cfg[DEFAULT_CHECKPOINT_FOLDER_KEY]}{cfg[JOB_NAME_KEY]}-{uuid.uuid4().hex[:6]}" + print(f"Overriding checkpoint folder to {cfg[DEFAULT_CHECKPOINT_FOLDER_KEY]}") + + # init mast provisioner + await init_provisioner( + ProvisionerConfig( + launcher_config=LauncherConfig( + launcher=Launcher(cfg.get(LAUNCHER_KEY, Launcher.MAST.value)), + job_name=cfg.get(JOB_NAME_KEY, None), + services={k: ServiceConfig(**v) for k, v in cfg.services.items()}, + actors={k: ProcessConfig(**v) for k, v in cfg.actors.items()}, + ) + ) + ) + await grpo_main(cfg) + + +if __name__ == "__main__": + + @parse + def _main(cfg): + asyncio.run(main(cfg)) + + _main() # @parse grabs the cfg from CLI diff --git a/apps/mast/qwen3_14b_mast.yaml b/apps/mast/qwen3_14b_mast.yaml new file mode 100644 index 000000000..83d5b8103 --- /dev/null +++ b/apps/mast/qwen3_14b_mast.yaml @@ -0,0 +1,162 @@ +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml + +# Global configuration +group_size: 8 +batch_size: 16 +max_req_tokens: 512 +max_res_tokens: 512 +model: "Qwen/Qwen3-14B" +off_by_n: 1 # Off by one by default +launcher: mast +job_name: forge-qwen3-14b +checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ + +# Main loop configuration +rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas + +# Observability configuration +metric_logging: + wandb: + project: "grpo-training" + group: "grpo_exp_${oc.env:USER}" + reduce_across_ranks: True + console: + reduce_across_ranks: True + +# Dataset configuration +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true + model: ${model} + +# Policy configuration +policy: + engine_config: + model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + tensor_parallel_size: 2 + pipeline_parallel_size: 1 + enforce_eager: false + # TODO: Had to disable this becasue vLLm wouldn't like + # needs to revisited. + disable_custom_all_reduce: true + sampling_config: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 14B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 4 + tensor_parallel_degree: 2 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + initial_load_in_hf: true + last_save_in_hf: true + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + comm: + # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP + # from oilfs if the traienr is not in the same region as in oilfs + init_timeout_seconds: 1200 + dcp_path: ${checkpoint_folder} + +# Replay buffer configuration +replay_buffer: + batch_size: ${batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 14B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + training: + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-14B/snapshots/8268fe3026cb304910457689366670e803a6fd56 + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 2 + with_gpus: true + mesh_name: policy + hosts: 1 + ref_model: + procs: 1 + num_replicas: 2 + with_gpus: true + mesh_name: ref_model + hosts: 1 + reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + mesh_name: reward_actor + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 8 + with_gpus: true + mesh_name: trainer + hosts: 1 + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/apps/mast/qwen3_1_7b_mast.yaml b/apps/mast/qwen3_1_7b_mast.yaml new file mode 100644 index 000000000..58d879579 --- /dev/null +++ b/apps/mast/qwen3_1_7b_mast.yaml @@ -0,0 +1,162 @@ +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml + +# Global configuration +group_size: 8 +batch_size: 16 +max_req_tokens: 512 +max_res_tokens: 512 +model: "Qwen/Qwen3-1.7B" +off_by_n: 1 # Off by one by default +launcher: mast +job_name: forge-qwen3-1_7b +checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ + +# Main loop configuration +rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas + +# Observability configuration +metric_logging: + wandb: + project: "grpo-training" + group: "grpo_exp_${oc.env:USER}" + reduce_across_ranks: True + console: + reduce_across_ranks: True + +# Dataset configuration +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true + model: ${model} + +# Policy configuration +policy: + engine_config: + model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + tensor_parallel_size: 1 + pipeline_parallel_size: 1 + enforce_eager: false + # TODO: Had to disable this becasue vLLm wouldn't like + # needs to revisited. + disable_custom_all_reduce: true + sampling_config: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + initial_load_in_hf: true + last_save_in_hf: true + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + comm: + # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP + # from oilfs if the traienr is not in the same region as in oilfs + init_timeout_seconds: 1200 + dcp_path: ${checkpoint_folder} + +# Replay buffer configuration +replay_buffer: + batch_size: ${batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + training: + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-1.7B/snapshots/0060bc56d46589041c1048efd1a397421b1142b5 + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 2 + with_gpus: true + mesh_name: policy + hosts: 1 + ref_model: + procs: 1 + num_replicas: 2 + with_gpus: true + mesh_name: ref_model + hosts: 1 + reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + mesh_name: reward_actor + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 1 + with_gpus: true + mesh_name: trainer + hosts: 1 + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/apps/mast/qwen3_32b_mast.yaml b/apps/mast/qwen3_32b_mast.yaml new file mode 100644 index 000000000..0db8f4af3 --- /dev/null +++ b/apps/mast/qwen3_32b_mast.yaml @@ -0,0 +1,162 @@ +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml + +# Global configuration +group_size: 8 +batch_size: 16 +max_req_tokens: 512 +max_res_tokens: 512 +model: "Qwen/Qwen3-32B" +off_by_n: 1 # Off by one by default +launcher: mast +job_name: forge-qwen3-32b +checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ + +# Main loop configuration +rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas + +# Observability configuration +metric_logging: + wandb: + project: "grpo-training" + group: "grpo_exp_${oc.env:USER}" + reduce_across_ranks: True + console: + reduce_across_ranks: True + +# Dataset configuration +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true + model: ${model} + +# Policy configuration +policy: + engine_config: + model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + tensor_parallel_size: 2 + pipeline_parallel_size: 1 + enforce_eager: false + # TODO: Had to disable this becasue vLLm wouldn't like + # needs to revisited. + disable_custom_all_reduce: true + sampling_config: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 32B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 4 + tensor_parallel_degree: 2 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + initial_load_in_hf: true + last_save_in_hf: true + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + comm: + # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP + # from oilfs if the traienr is not in the same region as in oilfs + init_timeout_seconds: 1200 + dcp_path: ${checkpoint_folder} + +# Replay buffer configuration +replay_buffer: + batch_size: ${batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 32B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + training: + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 2 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-32B/snapshots/d47b0d4ae4b48fde975756bf360a63a9cca8d470 + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 2 + with_gpus: true + mesh_name: policy + hosts: 1 + ref_model: + procs: 1 + num_replicas: 2 + with_gpus: true + mesh_name: ref_model + hosts: 1 + reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + mesh_name: reward_actor + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 8 + with_gpus: true + mesh_name: trainer + hosts: 1 + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/apps/mast/qwen3_4b_mast.yaml b/apps/mast/qwen3_4b_mast.yaml new file mode 100644 index 000000000..92119055a --- /dev/null +++ b/apps/mast/qwen3_4b_mast.yaml @@ -0,0 +1,162 @@ +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml + +# Global configuration +group_size: 8 +batch_size: 16 +max_req_tokens: 512 +max_res_tokens: 512 +model: "Qwen/Qwen3-4B" +off_by_n: 1 # Off by one by default +launcher: mast +job_name: forge-qwen3-4b +checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ + +# Main loop configuration +rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas + +# Observability configuration +metric_logging: + wandb: + project: "grpo-training" + group: "grpo_exp_${oc.env:USER}" + reduce_across_ranks: True + console: + reduce_across_ranks: True + +# Dataset configuration +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true + model: ${model} + +# Policy configuration +policy: + engine_config: + model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + tensor_parallel_size: 2 + pipeline_parallel_size: 1 + enforce_eager: false + # TODO: Had to disable this becasue vLLm wouldn't like + # needs to revisited. + disable_custom_all_reduce: true + sampling_config: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 4B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 4 + tensor_parallel_degree: 2 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + initial_load_in_hf: true + last_save_in_hf: true + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + comm: + # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP + # from oilfs if the traienr is not in the same region as in oilfs + init_timeout_seconds: 1200 + dcp_path: ${checkpoint_folder} + +# Replay buffer configuration +replay_buffer: + batch_size: ${batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 4B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + training: + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-4B-Base/snapshots/a81b894c2624d21c88a3ad737ce4f837424b7eed + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 2 + with_gpus: true + mesh_name: policy + hosts: 1 + ref_model: + procs: 1 + num_replicas: 2 + with_gpus: true + mesh_name: ref_model + hosts: 1 + reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + mesh_name: reward_actor + +actors: + dataset: + procs: 8 + with_gpus: false + mesh_name: dataset + trainer: + procs: 1 + with_gpus: true + mesh_name: trainer + hosts: 1 + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/apps/mast/qwen3_8b_mast.yaml b/apps/mast/qwen3_8b_mast.yaml new file mode 100644 index 000000000..7f2f99694 --- /dev/null +++ b/apps/mast/qwen3_8b_mast.yaml @@ -0,0 +1,162 @@ +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml + +# Global configuration +group_size: 8 +batch_size: 16 +max_req_tokens: 512 +max_res_tokens: 512 +model: "Qwen/Qwen3-8B" +off_by_n: 1 # Off by one by default +launcher: mast +job_name: forge-qwen3-8b +checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ + +# Main loop configuration +rollout_threads: ${services.policy.num_replicas} # Recommended to set equal to policy.num_replicas + +# Observability configuration +metric_logging: + wandb: + project: "grpo-training" + group: "grpo_exp_${oc.env:USER}" + reduce_across_ranks: True + console: + reduce_across_ranks: True + +# Dataset configuration +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true + model: ${model} + +# Policy configuration +policy: + engine_config: + model: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + tensor_parallel_size: 2 + pipeline_parallel_size: 1 + enforce_eager: false + # TODO: Had to disable this becasue vLLm wouldn't like + # needs to revisited. + disable_custom_all_reduce: true + sampling_config: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 8B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 4 + tensor_parallel_degree: 2 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + initial_load_in_hf: true + last_save_in_hf: true + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + comm: + # TODO: needs to be revisited. causing NCCL timeouts on inits when loading CP + # from oilfs if the traienr is not in the same region as in oilfs + init_timeout_seconds: 1200 + dcp_path: ${checkpoint_folder} + +# Replay buffer configuration +replay_buffer: + batch_size: ${batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 8B + hf_assets_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + training: + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: /mnt/wsfuse/huggingface_models/models--Qwen--Qwen3-8B/snapshots/model + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 2 + with_gpus: true + mesh_name: policy + hosts: 1 + ref_model: + procs: 1 + num_replicas: 2 + with_gpus: true + mesh_name: ref_model + hosts: 1 + reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + mesh_name: reward_actor + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 8 + with_gpus: true + mesh_name: trainer + hosts: 1 + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 464674f2c..4b61f096c 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -18,7 +18,6 @@ import torch import torch.distributed.checkpoint as dcp import torchstore as ts - from monarch.actor import current_rank, endpoint, ProcMesh from torchstore.state_dict_utils import DELIM from vllm.config import VllmConfig @@ -173,6 +172,7 @@ async def launch( # pyright: ignore[reportIncompatibleMethodOverride] procs=cls.procs, hosts=cls.hosts, with_gpus=cls.with_gpus, + mesh_name=cls.mesh_name, ) worker_procs = await get_proc_mesh(process_config=process_config) diff --git a/src/forge/controller/actor.py b/src/forge/controller/actor.py index f9790ffd3..bb495b641 100644 --- a/src/forge/controller/actor.py +++ b/src/forge/controller/actor.py @@ -26,6 +26,7 @@ class ForgeActor(Actor): hosts: int | None = None with_gpus: bool = False num_replicas: int = 1 + mesh_name: str | None = None _extra_config: dict[str, Any] = {} def __init__(self, *args, **kwargs): @@ -58,6 +59,7 @@ def options( hosts: int | None = None, with_gpus: bool = False, num_replicas: int = 1, + mesh_name: str | None = None, **kwargs, ) -> Type[T]: """ @@ -91,6 +93,7 @@ def options( "hosts": hosts, "with_gpus": with_gpus, "num_replicas": num_replicas, + "mesh_name": mesh_name, "_extra_config": kwargs, } @@ -116,6 +119,7 @@ async def as_service( "hosts": cls.hosts, "with_gpus": cls.with_gpus, "num_replicas": cls.num_replicas, + "mesh_name": cls.mesh_name, **cls._extra_config, # all extra fields } cfg = ServiceConfig(**cfg_kwargs) @@ -181,6 +185,7 @@ async def launch(cls, *args, **kwargs) -> "ForgeActor": procs=cls.procs, hosts=cls.hosts, with_gpus=cls.with_gpus, + mesh_name=cls.mesh_name, ) proc_mesh = await get_proc_mesh(process_config=cfg) diff --git a/src/forge/controller/launcher.py b/src/forge/controller/launcher.py new file mode 100644 index 000000000..cd54c00b0 --- /dev/null +++ b/src/forge/controller/launcher.py @@ -0,0 +1,314 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import getpass +import os +import socket +import subprocess +import uuid +from typing import Any + +import monarch + +import torchx.specs as specs + +from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints +from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer +from monarch.actor import Actor, endpoint, ProcMesh +from monarch.tools import commands +from monarch.tools.commands import info +from monarch.tools.components import hyperactor +from monarch.tools.config import Config, Workspace + +from forge.types import Launcher, LauncherConfig + +try: + from monarch._src.actor.actor_mesh import current_rank + from monarch._src.actor.meta.allocator import MastAllocator, MastAllocatorConfig + from monarch.tools.components.meta import hyperactor as meta_hyperactor + from torchx.specs import AppState + from torchx.specs.fb.component_helpers import Packages +except ImportError as e: + print(f"Warning: Monarch meta/fb inetrnal imports failed: {e}") + print("Monarch functionality will be limited") + +JOB_NAME_KEY = "job_name" +LAUNCHER_KEY = "launcher" + + +def _get_port() -> str: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 0)) + addr = s.getsockname() + port = addr[1] + return str(port) + + +class SetupActor(Actor): + @endpoint + def get_info(self) -> [str, str]: + return socket.gethostname(), _get_port() + + +class MastSetupActor(SetupActor): + @endpoint + def mount(self, mount_dst: str): + point = current_rank() + # The last dimension is the local proc count. + last_label = point.extent.labels[-1] + proc_count = point.size(last_label) + if current_rank().rank % proc_count != 0: + # Only use one rank per host to mount the directory + return + self.mount_mnt_directory(mount_dst) + + def mount_mnt_directory(self, mount_dst: str) -> None: + # Sanity check of the mounted directory + sanity_path = os.path.join(mount_dst, "huggingface_models/") + if os.path.exists(sanity_path): + print(f"Found directory {sanity_path}; skip mounting.") + return + + # Otherwise, mount the directory + if not os.path.exists(mount_dst): + os.makedirs(mount_dst, exist_ok=True) + + # Store original LD_LIBRARY_PATH to restore after mounting + original_ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") + + try: + clean_env = os.environ.copy() + if "LD_LIBRARY_PATH" in clean_env: + del clean_env["LD_LIBRARY_PATH"] + + subprocess.run( + [ + "/packages/oil.oilfs/oilfs-wrapper", + "ws://ws.ai.pci0ai/genai_fair_llm", + mount_dst, + ], + capture_output=True, + text=True, + check=True, + env=clean_env, + ) + print("Done mounting") + except subprocess.CalledProcessError as e: + print( + f"Get error during mounting {e}, Stderr: {e.stderr}, Stdout: {e.stdout}" + ) + finally: + # Restore original LD_LIBRARY_PATH + if original_ld_library_path: + os.environ["LD_LIBRARY_PATH"] = original_ld_library_path + elif "LD_LIBRARY_PATH" in os.environ: + del os.environ["LD_LIBRARY_PATH"] + + assert os.path.exists( + sanity_path + ), f"Did not find directory {sanity_path}; something wrong with mounting." + + +class BaseLauncher: + async def initialize(self) -> None: + pass + + async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: + pass + + async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + pass + + +class Slurmlauncher(BaseLauncher): + async def initialize(self) -> None: + pass + + async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: + appdef = hyperactor.host_mesh( + image="test", meshes=[f"{name}:{num_hosts}:gpu.small"] + ) + for role in appdef.roles: + # Note - this is hardcoded to SLURM + # We got this with sinfo + role.resource.memMB = 2062607 + role.resource.cpu = 128 + role.resource.gpu = 8 + + # TODO - multi scheduler support + server_config = Config( + scheduler="slurm", + appdef=appdef, + workspace=monarch.tools.config.workspace.Workspace(dirs=[""]), + ) + server_info = await commands.get_or_create( + "forge_job", + server_config, + force_restart=False, + ) + alloc = RemoteAllocator( + world_id=name, + initializer=TorchXRemoteAllocInitializer(server_info.server_handle), + ) + server_name = f"slurm:///{server_info.name}" + return alloc, None, server_name # (Allocator, AllocConstraints, SeverName) + + async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + setup = procs.spawn(f"setup-{uuid.uuid1()}", SetupActor) + return await setup.get_info.choose() + + +class Mastlauncher(BaseLauncher): + def __init__(self, cfg: LauncherConfig | None = None): + assert cfg is not None + self.cfg = cfg + self.default_monarch_port = 26600 + self.scheduler_name = "mast_conda" + + # TODO: enabe taking this from config + self.sku = "gtt_any" + self.timeout_sec = 1 * 60 * 60 # Kill the job if idle for 1 hour + self.user = getpass.getuser() + self.work_dir = f"/data/users/{self.user}" + self.edittable_workspaces = ["forge"] + self.remote_work_dir = "/packages/monarch_default_workspace/workspace/" + self.editable_workspace_paths = [ + f"{self.work_dir}/{workspace}" for workspace in self.edittable_workspaces + ] + self.job_name = self.cfg.job_name or self.create_job_name() + + async def initialize(self) -> None: + await self.launch_mast_job() + + async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: + allocator = MastAllocator( + MastAllocatorConfig( + job_name=self.job_name, + remote_allocator_port=self.default_monarch_port, + ), + ) + alloc_constraints = AllocConstraints( + {MastAllocator.ALLOC_LABEL_TASK_GROUP: name} + ) + + return allocator, alloc_constraints, self.create_server_handle() + + async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + setup = procs.spawn(f"setup-{uuid.uuid1()}", MastSetupActor) + await setup.mount.call(mount_dst="/mnt/wsfuse") + return await setup.get_info.choose() + + async def launch_mast_job(self): + handle = self.create_server_handle() + server_spec = info(handle) + if server_spec and server_spec.state == AppState.RUNNING: + print(f"Job {self.job_name} is already running. Skipping launch.") + return server_spec + + config = Config( + scheduler="mast_conda", + scheduler_args={ + "hpcIdentity": "hyper_monarch", + "hpcJobOncall": "monarch", + "hpcClusterUuid": "MastProdCluster", + "rmAttribution": "pytorch4all_clients_approved", + }, + appdef=self.build_appdef(), + workspace=Workspace( + dirs=[workspace_dir for workspace_dir in self.editable_workspace_paths], + ), + ) + + await commands.get_or_create(self.job_name, config) + return server_spec + + def add_additional_packages(self, packages: "Packages") -> "Packages": + packages.add_package("oil.oilfs:stable") + packages.add_package("manifold.manifoldfs") + return packages + + def build_appdef(self) -> specs.AppDef: + + # create the app definition for the worker + remote_end_python_path = ":".join( + [ + f"{self.remote_work_dir}{workspace}" + for workspace in self.editable_workspace_paths + ] + ) + + default_envs = { + **meta_hyperactor.DEFAULT_NVRT_ENVS, + **meta_hyperactor.DEFAULT_NCCL_ENVS, + **meta_hyperactor.DEFAULT_TORCH_ENVS, + **{ + "TORCHX_RUN_PYTHONPATH": f"{remote_end_python_path}:{self.remote_work_dir}" + }, + **{ + "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS": "600", + "HYPERACTOR_CODE_MAX_FRAME_LENGTH": "1073741824", + "TORCHINDUCTOR_COMPILE_THREADS": "1", + "TORCH_COMPILE_DISABLE": "1", + "TORCHDYNAMO_VERBOSE": "1", + "VLLM_TORCH_COMPILE_LEVEL": "0", + "VLLM_USE_TRITON_FLASH_ATTN": "0", + }, + } + + print("DEFAULT ENVS: ", default_envs) + + packages = Packages() + meshes = [] + # Process both services and actors configurations + for mesh_name, service in self.cfg.services.items(): + num_replicas = service.num_replicas + with_gpus = bool(service.with_gpus) + num_hosts = int(service.hosts or 0) + # Create list of mesh names with indices and num_hosts + if with_gpus and num_hosts > 0: + mesh_list = [ + f"{mesh_name}_{i}:{num_hosts}:{self.sku}" + for i in range(num_replicas) + ] + meshes.extend(mesh_list) + + for mesh_name, actor in self.cfg.actors.items(): + num_replicas = 1 + with_gpus = bool(actor.with_gpus) + num_hosts = int(actor.hosts or 0) + # single actors with GPUs + if with_gpus: + meshes.append(f"{mesh_name}:{num_replicas}:{self.sku}") + + appdef = meta_hyperactor.host_mesh_conda( + meshes=meshes, + additional_packages=self.add_additional_packages(packages), + timeout_sec=self.timeout_sec, + env=default_envs, + ) + + for role in appdef.roles: + role.resource.capabilities["server_sub_types"] = [ + # role.resource.capabilities["server_sub_types"][2] # hardcoded to ROCE + role.resource.capabilities["server_sub_types"][1] # GTT + ] + + return appdef + + def create_job_name(self): + return f"{self.user}-forge-{uuid.uuid4().hex[:6]}" + + def create_server_handle(self) -> str: + return f"{self.scheduler_name}:///{self.job_name}" + + +def get_launcher(cfg: LauncherConfig | None = None) -> BaseLauncher | None: + if not cfg or cfg.launcher == Launcher.SLURM: + return Slurmlauncher() + elif cfg.launcher == Launcher.MAST: + return Mastlauncher(cfg) + else: + raise ValueError(f"Unsupported config provided, got {cfg}") diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index 1951eab76..d66504707 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -12,37 +12,22 @@ import os import socket import uuid +from typing import Optional -import monarch -from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer from monarch._src.actor.shape import NDSlice, Shape -from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host +from monarch.actor import HostMesh, ProcMesh, this_host from monarch.tools import commands -from monarch.tools.components import hyperactor -from monarch.tools.config import Config + +from forge.controller.launcher import BaseLauncher, get_launcher from forge.observability.metric_actors import get_or_create_metric_logger -from forge.types import ProcessConfig +from forge.types import ProcessConfig, ProvisionerConfig logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -def _get_port() -> str: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("localhost", 0)) - addr = s.getsockname() - port = addr[1] - return str(port) - - -class _SetupActor(Actor): - @endpoint - def get_info(self) -> [str, str]: - return socket.gethostname(), _get_port() - - class GpuManager: """Tracks and assigns GPU devices on a host. @@ -79,7 +64,7 @@ def release_gpus(self, gpu_ids: list[str]) -> None: class Provisioner: """A global resource provisioner.""" - def __init__(self): + def __init__(self, cfg: ProvisionerConfig | None = None): self._server_names = [] self._proc_server_map = {} self._lock = asyncio.Lock() @@ -108,44 +93,44 @@ def __init__(self): self._host_gpu_map = { self._this_host_id: GpuManager(available_local_devices), } + self.launcher: BaseLauncher | None = get_launcher( + cfg.launcher_config if cfg is not None else None + ) + if not self.launcher: + logger.warning("Launcher not provided, remote allocations will not work.") + + async def initialize(self): + """Call this after creating the instance""" + if self.launcher is not None: + await self.launcher.initialize() async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: """Creates a remote server and a HostMesh on it.""" # no need to lock here because this is already locked behind `get_proc_mesh` + if not self.launcher: + raise RuntimeError( + "You tried to create a remote allocation by specifying the number of hosts on an actor or service, " + "but no launcher was specified." + ) logger.debug(f"Creating remote server for alloc {name}") - appdef = hyperactor.host_mesh( - image="test", meshes=[f"{name}:{num_hosts}:gpu.small"] + alloc, alloc_constraints, server_name = await self.launcher.get_allocator( + name, num_hosts ) - for role in appdef.roles: - # Note - this is hardcoded to SLURM - # We got this with sinfo - role.resource.memMB = 2062607 - role.resource.cpu = 128 - role.resource.gpu = 8 - - # TODO - multi scheduler support - server_config = Config( - scheduler="slurm", - appdef=appdef, - workspace=monarch.tools.config.workspace.Workspace(dirs=[""]), - ) - server_info = await commands.get_or_create( - "forge_job", - server_config, - force_restart=False, - ) - alloc = RemoteAllocator( - world_id=name, - initializer=TorchXRemoteAllocInitializer(server_info.server_handle), - ) - server_name = f"slurm:///{server_info.name}" return ( - HostMesh(Shape(["hosts"], NDSlice.new_row_major([num_hosts])), alloc), + HostMesh( + Shape(["hosts"], NDSlice.new_row_major([num_hosts])), + allocator=alloc, + alloc_constraints=alloc_constraints, + ), server_name, ) async def get_proc_mesh( - self, num_procs: int, with_gpus: bool = False, num_hosts: int | None = None + self, + num_procs: int, + with_gpus: bool = False, + num_hosts: int | None = None, + mesh_name: Optional[str] = None, ): """Gets a proc mesh. @@ -157,7 +142,7 @@ async def get_proc_mesh( if num_hosts is not None and num_hosts > 0: created_hosts = len(self._server_names) host_mesh, server_name = await self.create_host_mesh( - name=f"alloc-{created_hosts}", + name=mesh_name, num_hosts=num_hosts, ) host_id = uuid.uuid1() @@ -199,11 +184,10 @@ def bootstrap(gpu_ids: list[str]): per_host={"gpus": num_procs}, bootstrap=functools.partial(bootstrap, gpu_ids=gpu_ids), ) - setup = procs.spawn(f"setup-{uuid.uuid1()}", _SetupActor) # Pick a random host/port, we'll feed this in afterwards # Once we have true HostMesh support, we can do this on proc 0 of each host # then spin up the proc meshes with the environment afterwards. - hostname, port = await setup.get_info.choose() + hostname, port = await self.launcher.remote_setup(procs) procs._hostname = hostname procs._port = port procs._gpu_ids = gpu_ids @@ -248,25 +232,36 @@ async def shutdown(self): _provisioner: Provisioner | None = None -def _get_provisioner(): +async def init_provisioner(cfg: ProvisionerConfig | None = None): global _provisioner if not _provisioner: - _provisioner = Provisioner() + _provisioner = Provisioner(cfg) + await _provisioner.initialize() + return _provisioner + + +async def _get_provisioner(): + if not _provisioner: + await init_provisioner() return _provisioner async def get_proc_mesh(config: ProcessConfig) -> ProcMesh: - return await _get_provisioner().get_proc_mesh( + provisioner = await _get_provisioner() + return await provisioner.get_proc_mesh( num_procs=config.procs, with_gpus=config.with_gpus, num_hosts=config.hosts, + mesh_name=config.mesh_name, ) async def stop_proc_mesh(proc_mesh: ProcMesh): - return await _get_provisioner().stop_proc_mesh(proc_mesh=proc_mesh) + provisioner = await _get_provisioner() + return await provisioner.stop_proc_mesh(proc_mesh=proc_mesh) async def shutdown(): logger.info("Shutting down provisioner..") - await _get_provisioner().shutdown() + provisioner = await _get_provisioner() + return await provisioner.shutdown() diff --git a/src/forge/controller/service/replica.py b/src/forge/controller/service/replica.py index 09b0a2ce6..dfdb10169 100644 --- a/src/forge/controller/service/replica.py +++ b/src/forge/controller/service/replica.py @@ -159,6 +159,10 @@ async def initialize(self): # Deploy the actor and its underlying resources logger.debug(f"Launching actor for replica {self.idx}") + mesh_name_with_replica = f"{self.proc_config.mesh_name}_{self.idx}" + self.proc_config.mesh_name = mesh_name_with_replica + if hasattr(self.actor_def, "mesh_name"): + self.actor_def.mesh_name = mesh_name_with_replica self.actor = await self.actor_def.launch( *self.actor_args, **self.actor_kwargs, diff --git a/src/forge/types.py b/src/forge/types.py index cc41d2185..f79e3ef2c 100644 --- a/src/forge/types.py +++ b/src/forge/types.py @@ -5,6 +5,7 @@ # LICENSE file in the root directory of this source tree. from dataclasses import dataclass, field +from enum import Enum from typing import Any, TypedDict, Union @@ -87,6 +88,11 @@ class State: metadata: dict[str, Any] = field(default_factory=dict) +class Launcher(Enum): + MAST = "mast" + SLURM = "slurm" + + @dataclass class ProcessConfig: """A proc_mesh config for the torchx scheduler.""" @@ -94,6 +100,7 @@ class ProcessConfig: procs: int = 1 with_gpus: bool = False hosts: int | None = None + mesh_name: str | None = None @dataclass @@ -118,6 +125,7 @@ class ServiceConfig: health_poll_rate: float = 0.2 replica_max_concurrent_requests: int = 10 return_first_rank_result: bool = True + mesh_name: str | None = None def to_process_config(self) -> ProcessConfig: """Extract ProcessConfig from this ServiceConfig. @@ -127,7 +135,25 @@ def to_process_config(self) -> ProcessConfig: procs=self.procs, with_gpus=self.with_gpus, hosts=self.hosts, + mesh_name=self.mesh_name, ) Scalar = Union[int, float] + + +@dataclass +class LauncherConfig: + """A launcher config for the scheduler.""" + + launcher: Launcher + job_name: str + services: dict[str, ServiceConfig] + actors: dict[str, ProcessConfig] + + +@dataclass +class ProvisionerConfig: + """A config for the forge provisioner.""" + + launcher_config: LauncherConfig