diff --git a/README.md b/README.md index 9ef19030..d4cb64c5 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,19 @@ You can find the tutorial series below: - [Part 2](examples/hello-world/hello_experiments.ipynb). - [Part 3](examples/hello-world/hello_scripts.py). +#### Lepton Executor Examples + +The Lepton executor examples demonstrate comprehensive distributed training workflows on Lepton clusters: + +- **Distributed Training**: Multi-node, multi-GPU setups with automatic scaling +- **Secure Secret Management**: Use workspace secrets instead of hardcoded tokens +- **Storage Integration**: Remote storage mounting and data management +- **Container Orchestration**: Advanced environment setup and dependency management +- **Production Workflows**: End-to-end ML training pipelines + +You can find the Lepton examples here: +- [Lepton Distributed Training Examples](examples/lepton/) + ## Contribute to NeMo Run Please see the [contribution guide](./CONTRIBUTING.md) to contribute to NeMo Run. diff --git a/examples/lepton/README.md b/examples/lepton/README.md new file mode 100644 index 00000000..b96e2990 --- /dev/null +++ b/examples/lepton/README.md @@ -0,0 +1,117 @@ +# Lepton Executor Examples + +This directory contains examples demonstrating how to use the `LeptonExecutor` for distributed machine learning workflows on Lepton clusters. + +## Examples + +### šŸš€ finetune.py + +A comprehensive example showing how to use the `LeptonExecutor` for **distributed NeMo model fine-tuning** with advanced features including secure secret management, remote storage, and custom environment setup. + +#### Usage Examples + +**Basic Fine-tuning:** +```python +# Single-node, single-GPU setup +python finetune.py + +# The example will: +# 1. Create a LeptonExecutor with comprehensive configuration +# 2. Set up NeMo fine-tuning recipe with LoRA +# 3. Launch distributed training with monitoring +# 4. Handle resource management and cleanup +``` + +**Distributed Training:** +```python +# Multi-node setup (modify in the script) +nodes = 4 +gpus_per_node = 8 +# Will automatically configure FSDP2 strategy for 32 total GPUs +``` + +#### Configuration Guide + +**Resource Configuration:** +```python +# Adjust these based on your Lepton workspace +resource_shape="gpu.8xh100-80gb" # GPU type and count +node_group="your-node-group-name" # Your Lepton node group +``` + +**Storage Setup:** +```python +mounts=[{ + "from": "node-nfs:your-storage", # Storage source + "path": "/path/to/your/remote/storage", # Remote path + "mount_path": "/nemo-workspace", # Container mount point +}] +``` + +**Secret Management:** + +For sensitive data like API tokens: +```python +# NOT RECOMMENDED - Hardcoded secrets +env_vars={ + "HF_TOKEN": "hf_your_actual_token_here", # Exposed in code! +} + +# RECOMMENDED - Secure secret references +env_vars={ + "HF_TOKEN": {"value_from": {"secret_name_ref": "HUGGING_FACE_HUB_TOKEN_read"}}, + "WANDB_API_KEY": {"value_from": {"secret_name_ref": "WANDB_API_KEY_secret"}}, + # Regular env vars can still be set directly + "NCCL_DEBUG": "INFO", + "TORCH_DISTRIBUTED_DEBUG": "INFO", +} +``` + +#### Prerequisites + +**1. Lepton Workspace Setup:** +- Node groups configured with appropriate GPUs +- Shared storage mounted and accessible +- Container registry access for NeMo images + +**2. Optional Secrets (for enhanced security):** +```bash +# Create these secrets in your Lepton workspace +HUGGING_FACE_HUB_TOKEN_read # For HuggingFace model access +WANDB_API_KEY_secret # For experiment tracking +``` + +**3. Resource Requirements:** +- GPU nodes (H100, A100, V100, etc.) +- Sufficient shared storage space +- Network connectivity for container pulls + +#### Advanced Features + +**Custom Pre-launch Commands:** +```python +pre_launch_commands=[ + "echo 'šŸš€ Starting setup...'", + "nvidia-smi", # Check GPU status + "df -h", # Check disk space + "python3 -m pip install 'datasets>=4.0.0'", # Install dependencies + "export CUSTOM_VAR=value", # Set environment +] +``` + +**Training Strategy Selection:** +```python +# Automatic strategy selection for single node +if nodes == 1: + recipe.trainer.strategy = "auto" + +# FSDP2 for multi-node distributed training +else: + recipe.trainer.strategy = run.Config( + nl.FSDP2Strategy, + data_parallel_size=nodes * gpus_per_node, + tensor_parallel_size=1 + ) +``` + +For more details on Lepton cluster management and configuration, refer to the Lepton documentation. diff --git a/examples/lepton/finetune.py b/examples/lepton/finetune.py new file mode 100644 index 00000000..5d00c886 --- /dev/null +++ b/examples/lepton/finetune.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +""" +NeMo Fine-tuning with Lepton Executor + +This comprehensive example demonstrates how to use the LeptonExecutor for distributed +NeMo model fine-tuning with various advanced features. + +Prerequisites: +- Lepton workspace with proper node groups and GPU resources +- Secrets configured in your Lepton workspace (optional but recommended) +- Shared storage accessible to your Lepton cluster +- NeMo container image available + +This example serves as a template for production ML workflows on Lepton clusters. +""" + +from nemo.collections import llm +import nemo_run as run +from nemo import lightning as nl + + +def nemo_lepton_executor(nodes: int, devices: int, container_image: str): + """ + Create a LeptonExecutor with secret handling capabilities. + + Args: + nodes: Number of nodes for distributed training + devices: Number of GPUs per node + container_image: Docker container image to use + + Returns: + Configured LeptonExecutor with secret support + """ + + return run.LeptonExecutor( + # Required parameters + container_image=container_image, + nemo_run_dir="/nemo-workspace", # Directory for NeMo Run files on remote storage + # Lepton compute configuration + nodes=nodes, + gpus_per_node=devices, + nprocs_per_node=devices, # Number of processes per node (usually = gpus_per_node) + # Lepton workspace configuration - REQUIRED for actual usage + resource_shape="gpu.1xh200", # Specify GPU type/count - adjust as needed + node_group="your-node-group-name", # Specify your node group - must exist in workspace + # Remote storage mounts (using correct mount structure) + mounts=[ + { + "from": "node-nfs:your-shared-storage", + "path": "/path/to/your/remote/storage", # Remote storage path + "mount_path": "/nemo-workspace", # Mount path in container + } + ], + # Environment variables - SECURE SECRET HANDLING + env_vars={ + # SECRET REFERENCES (recommended for sensitive data) + # These reference secrets stored securely in your Lepton workspace + "HF_TOKEN": {"value_from": {"secret_name_ref": "HUGGING_FACE_HUB_TOKEN_read"}}, + "WANDB_API_KEY": { + "value_from": {"secret_name_ref": "WANDB_API_KEY_secret"} + }, # Optional + # šŸ“‹ REGULAR ENVIRONMENT VARIABLES + # Non-sensitive configuration can be set directly + "NCCL_DEBUG": "INFO", + "TORCH_DISTRIBUTED_DEBUG": "INFO", + "CUDA_LAUNCH_BLOCKING": "1", + "TOKENIZERS_PARALLELISM": "false", + }, + # Shared memory size for inter-process communication + shared_memory_size=65536, + # Custom commands to run before launching the training + pre_launch_commands=[ + "echo 'šŸš€ Starting NeMo fine-tuning with Lepton secrets...'", + "nvidia-smi", + "df -h", + "python3 -m pip install 'datasets>=4.0.0'", + "python3 -m pip install 'transformers>=4.40.0'", + ], + ) + + +def create_finetune_recipe(nodes: int, gpus_per_node: int): + """ + Create a NeMo fine-tuning recipe with LoRA. + + Args: + nodes: Number of nodes for distributed training + gpus_per_node: Number of GPUs per node + + Returns: + Configured NeMo recipe for fine-tuning + """ + + recipe = llm.hf_auto_model_for_causal_lm.finetune_recipe( + model_name="meta-llama/Llama-3.2-3B", # Model to fine-tune + dir="/nemo-workspace/llama3.2_3b_lepton", # Use nemo-workspace mount path + name="llama3_lora_lepton", + num_nodes=nodes, + num_gpus_per_node=gpus_per_node, + peft_scheme="lora", # Parameter-Efficient Fine-Tuning with LoRA + max_steps=100, # Adjust based on your needs + ) + + # LoRA configuration + recipe.peft.target_modules = ["linear_qkv", "linear_proj", "linear_fc1", "*_proj"] + recipe.peft.dim = 16 + recipe.peft.alpha = 32 + + # Strategy configuration for distributed training + if nodes == 1: + recipe.trainer.strategy = "auto" # Let Lightning choose the best strategy + else: + recipe.trainer.strategy = run.Config( + nl.FSDP2Strategy, data_parallel_size=nodes * gpus_per_node, tensor_parallel_size=1 + ) + + return recipe + + +if __name__ == "__main__": + # Configuration + nodes = 1 # Start with single node for testing + gpus_per_node = 1 + + # Create the fine-tuning recipe + recipe = create_finetune_recipe(nodes, gpus_per_node) + + # Create the executor with secret handling + executor = nemo_lepton_executor( + nodes=nodes, + devices=gpus_per_node, + container_image="nvcr.io/nvidia/nemo:25.04", # Use appropriate NeMo container + ) + + # Optional: Check executor capabilities + print("šŸ” Executor Information:") + print(f"šŸ“‹ Nodes: {executor.nnodes()}") + print(f"šŸ“‹ Processes per node: {executor.nproc_per_node()}") + + # Check macro support + macro_values = executor.macro_values() + print(f"šŸ“‹ Macro values support: {macro_values is not None}") + + try: + # Create and run the experiment + with run.Experiment( + "lepton-nemo-secrets-demo", executor=executor, log_level="DEBUG" + ) as exp: + # Add the fine-tuning task + task_id = exp.add(recipe, tail_logs=True, name="llama3_lora_with_secrets") + + # Execute the experiment + print("Starting fine-tuning experiment with secure secret handling...") + exp.run(detach=False, tail_logs=True, sequential=True) + + print("Experiment completed successfully!") + + except Exception as e: + print(f"\n Error occurred: {type(e).__name__}") + print(f" Message: {str(e)}") + print("\nšŸ’” Common issues to check:") + print(" - Ensure your Lepton workspace has the required secrets configured") + print(" - Verify node_group and resource_shape match your workspace") + print(" - Check that mount paths are correct and accessible") + print(" - Confirm container image is available and compatible") diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 61a70b43..5449802f 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -17,6 +17,7 @@ from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup from leptonai.api.v1.types.deployment import ( EnvVar, + EnvValue, LeptonContainer, Mount, ) @@ -232,7 +233,16 @@ def create_lepton_job(self, name: str): """ client = APIClient() - envs = [EnvVar(name=key, value=value) for key, value in self.env_vars.items()] + # Process environment variables - handle both regular values and secret references + envs = [] + for key, value in self.env_vars.items(): + if isinstance(value, dict) and "value_from" in value: + # Handle secret reference + secret_name_ref = value["value_from"]["secret_name_ref"] + envs.append(EnvVar(name=key, value_from=EnvValue(secret_name_ref=secret_name_ref))) + else: + # Handle regular environment variable + envs.append(EnvVar(name=key, value=str(value))) cmd = ["/bin/bash", "-c", f"bash {self.lepton_job_dir}/launch_script.sh"] diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py index 7fdc08cc..0d4bd52f 100644 --- a/test/core/execution/test_lepton.py +++ b/test/core/execution/test_lepton.py @@ -21,6 +21,8 @@ import pytest from leptonai.api.v1.types.common import LeptonVisibility, Metadata from leptonai.api.v1.types.deployment import ( + EnvVar, + EnvValue, LeptonContainer, LeptonResourceAffinity, Mount, @@ -973,3 +975,231 @@ def test_launch_prelaunch_commands_join( handle = mock_file.return_value.__enter__.return_value written_content = handle.write.call_args[0][0] assert "echo setup\nexport VAR=1\n" in written_content + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_env_vars_with_secrets(self, mock_APIClient_class): + """Test that environment variables with secret references are processed correctly.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + # Test executor with mixed environment variables (secrets and regular values) + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + env_vars={ + # Secret reference + "HF_TOKEN": {"value_from": {"secret_name_ref": "HUGGING_FACE_HUB_TOKEN_read"}}, + "WANDB_API_KEY": {"value_from": {"secret_name_ref": "WANDB_API_KEY_secret"}}, + # Regular environment variables + "NCCL_DEBUG": "INFO", + "TORCH_DISTRIBUTED_DEBUG": "INFO", + "NUM_WORKERS": 4, # Test integer conversion + "ENABLE_FEATURE": True, # Test boolean conversion + }, + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify job.create was called + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + + # Check that envs are correctly processed + envs = created_job.spec.envs + assert len(envs) == 6 # Should have all 6 environment variables + + # Create a dictionary for easier testing + env_dict = {env.name: env for env in envs} + + # Verify secret references are properly set + assert "HF_TOKEN" in env_dict + hf_token_env = env_dict["HF_TOKEN"] + assert hf_token_env.value is None # Should not have direct value + assert hf_token_env.value_from is not None + assert hf_token_env.value_from.secret_name_ref == "HUGGING_FACE_HUB_TOKEN_read" + + assert "WANDB_API_KEY" in env_dict + wandb_env = env_dict["WANDB_API_KEY"] + assert wandb_env.value is None # Should not have direct value + assert wandb_env.value_from is not None + assert wandb_env.value_from.secret_name_ref == "WANDB_API_KEY_secret" + + # Verify regular environment variables are properly set + assert "NCCL_DEBUG" in env_dict + nccl_env = env_dict["NCCL_DEBUG"] + assert nccl_env.value == "INFO" + assert nccl_env.value_from is None # Should not have secret reference + + assert "TORCH_DISTRIBUTED_DEBUG" in env_dict + torch_env = env_dict["TORCH_DISTRIBUTED_DEBUG"] + assert torch_env.value == "INFO" + assert torch_env.value_from is None # Should not have secret reference + + # Verify type conversion for non-string values + assert "NUM_WORKERS" in env_dict + workers_env = env_dict["NUM_WORKERS"] + assert workers_env.value == "4" # Should be converted to string + assert workers_env.value_from is None + + assert "ENABLE_FEATURE" in env_dict + feature_env = env_dict["ENABLE_FEATURE"] + assert feature_env.value == "True" # Should be converted to string + assert feature_env.value_from is None + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_env_vars_all_regular_values(self, mock_APIClient_class): + """Test that executor works correctly with only regular environment variables.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + # Test executor with only regular environment variables + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + env_vars={ + "NCCL_DEBUG": "INFO", + "CUDA_VISIBLE_DEVICES": "0,1,2,3", + "TRAINING_STEPS": 1000, + }, + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify job.create was called + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + + # Check that all envs are regular values + envs = created_job.spec.envs + assert len(envs) == 3 + + for env in envs: + assert env.value is not None # All should have direct values + assert env.value_from is None # None should have secret references + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_env_vars_all_secrets(self, mock_APIClient_class): + """Test that executor works correctly with only secret environment variables.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + # Test executor with only secret environment variables + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + env_vars={ + "API_KEY": {"value_from": {"secret_name_ref": "api_key_secret"}}, + "DATABASE_PASSWORD": {"value_from": {"secret_name_ref": "db_password_secret"}}, + "JWT_SECRET": {"value_from": {"secret_name_ref": "jwt_secret"}}, + }, + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify job.create was called + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + + # Check that all envs are secret references + envs = created_job.spec.envs + assert len(envs) == 3 + + for env in envs: + assert env.value is None # All should NOT have direct values + assert env.value_from is not None # All should have secret references + assert env.value_from.secret_name_ref is not None + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_env_vars_empty_dict(self, mock_APIClient_class): + """Test that executor works correctly with empty env_vars dictionary.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + # Test executor with empty env_vars + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + env_vars={}, + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify job.create was called + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + + # Check that envs list is empty + envs = created_job.spec.envs + assert len(envs) == 0 + + def test_env_vars_secret_structure_validation(self): + """Test that the secret structure is validated correctly.""" + + # Test that a properly structured secret dict is detected + test_env_vars = { + "SECRET_VAR": {"value_from": {"secret_name_ref": "my_secret"}}, + "REGULAR_VAR": "regular_value", + "DICT_WITHOUT_VALUE_FROM": {"some_key": "some_value"}, + } + + # Process the environment variables manually to test the logic + envs = [] + for key, value in test_env_vars.items(): + if isinstance(value, dict) and "value_from" in value: + # Handle secret reference + secret_name_ref = value["value_from"]["secret_name_ref"] + envs.append(EnvVar(name=key, value_from=EnvValue(secret_name_ref=secret_name_ref))) + else: + # Handle regular environment variable + envs.append(EnvVar(name=key, value=str(value))) + + # Verify correct processing + assert len(envs) == 3 + + # Check secret variable + secret_env = next(env for env in envs if env.name == "SECRET_VAR") + assert secret_env.value is None + assert secret_env.value_from is not None + assert secret_env.value_from.secret_name_ref == "my_secret" + + # Check regular variable + regular_env = next(env for env in envs if env.name == "REGULAR_VAR") + assert regular_env.value == "regular_value" + assert regular_env.value_from is None + + # Check dict without value_from (should be treated as regular) + dict_env = next(env for env in envs if env.name == "DICT_WITHOUT_VALUE_FROM") + assert dict_env.value == "{'some_key': 'some_value'}" # Converted to string + assert dict_env.value_from is None