diff --git a/.github/workflows/gpu_test.yaml b/.github/workflows/gpu_test.yaml index 71455c122..1880feffa 100644 --- a/.github/workflows/gpu_test.yaml +++ b/.github/workflows/gpu_test.yaml @@ -40,7 +40,17 @@ jobs: - name: Install torchforge run: pip install uv && uv pip install . && uv pip install .[dev] - name: Run unit tests with coverage - # TODO add all tests - run: pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv + run: PYTHONPATH=. pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv + - name: Run integration tests with coverage + env: + TORCHSTORE_RDMA_ENABLED: "0" # Disable RDMA on CI to avoid hangs + run: | + # set -e ensures the CI fails if any test file fails (not just the last one) + set -e + # Run each test file in a separate process to avoid Monarch state leakage + for test_file in tests/integration_tests/test_*.py; do + echo "Running $test_file" + PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vvs + done - name: Upload Coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/tests/integration_tests/README.md b/tests/integration_tests/README.md new file mode 100644 index 000000000..9054a18c8 --- /dev/null +++ b/tests/integration_tests/README.md @@ -0,0 +1,33 @@ +# Integration Tests + +This directory contains end-to-end integration tests for Forge components. + +## Running Tests + +### Important: Monarch Cleanup Issues + +Monarch has seen issues in the past with proper cleanup between tests, so **integration tests should NOT be run all together in a single pytest invocation**. Running multiple integration tests in the same process can cause state leakage and test failures. + +### Recommended Approach + +**Run individual test files:** +```bash +PYTHONPATH=. pytest tests/integration_tests/test_grpo_e2e.py -vv +PYTHONPATH=. pytest tests/integration_tests/test_policy_update.py -vv +``` + +**Run all integration tests (each in separate process):** +```bash +for f in tests/integration_tests/test_*.py; do + PYTHONPATH=. pytest "$f" -vv +done +``` + +**Run a specific test:** +```bash +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::TestGRPOEndToEnd::test_grpo_smoke_test +``` + +## CI Integration + +The CI pipeline (`.github/workflows/gpu_test.yaml`) automatically runs all integration tests, executing each test file in a separate process for isolation. diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index cf16a8cf0..9c3d5a1f1 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -4,6 +4,7 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + import argparse import pytest diff --git a/tests/integration_tests/disabled_test_grpo_e2e.py b/tests/integration_tests/disabled_test_grpo_e2e.py new file mode 100644 index 000000000..6e35e21ce --- /dev/null +++ b/tests/integration_tests/disabled_test_grpo_e2e.py @@ -0,0 +1,124 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +"""A simple smoke test that runs the GRPO loop for 3 steps. + +Run this with: +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test + + +""" + +import logging +import shutil +from pathlib import Path + +import monarch +import monarch.actor + +import pytest +import torch + +from forge.util.config import resolve_hf_hub_paths +from omegaconf import DictConfig, OmegaConf + +# Temporary workaround - without this, proc_mesh.stop +# will raise an exit code 1 failing all other tests. +monarch.actor.unhandled_fault_hook = lambda failure: None + + +logger: logging.Logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +requires_cuda = pytest.mark.skipif( + not torch.cuda.is_available(), + reason="CUDA not available", +) + +TEST_CHECKPOINT_DIR = "/tmp/grpo_test_checkpoint" + + +def _load_config(config_path: str) -> DictConfig: + """Load and resolve config from YAML file.""" + cfg = None + try: + cfg = OmegaConf.load(config_path) + except Exception as e: + pytest.fail(f"Failed to load config file {config_path}: {e}") + + assert isinstance(cfg, DictConfig) + cfg = resolve_hf_hub_paths(cfg) + return cfg + + +def _cleanup_checkpoint_dir(): + """Clean up test checkpoint directory.""" + path = Path(TEST_CHECKPOINT_DIR) + if path.exists() and path.is_dir(): + try: + shutil.rmtree(path) + logger.info(f"Successfully removed {TEST_CHECKPOINT_DIR}") + except Exception as e: + logger.error(f"Failed to remove {TEST_CHECKPOINT_DIR}: {e}") + + +class TestGRPOEndToEnd: + """End-to-end integration tests for GRPO training loop.""" + + @pytest.mark.asyncio + @pytest.mark.timeout(600) # 10 minute timeout to prevent hanging + @requires_cuda + async def test_grpo_smoke_test(self): + """ + Smoke test for GRPO training loop. + + This test runs the full GRPO pipeline for 3 training steps to verify: + - All actors and services initialize correctly + - Rollout loop generates completions + - Rewards are evaluated + - Reference model computes logprobs + - Replay buffer collects and batches experiences + - Trainer updates weights + - Policy receives weight updates + - Training completes successfully + """ + logger.info("=" * 80) + logger.info("Starting GRPO smoke test") + logger.info("=" * 80) + + try: + # Load test config + config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml" + cfg = _load_config(config_path) + + logger.info("Starting GRPO smoke test with config:") + logger.info(f" Model: {cfg.model}") + logger.info(f" Group size: {cfg.group_size}") + logger.info(f" Batch size: {cfg.local_batch_size}") + logger.info(f" Training steps: {cfg.trainer.training.steps}") + logger.info( + f" Max req/res tokens: {cfg.max_req_tokens}/{cfg.max_res_tokens}" + ) + + # Import main here to avoid issues with module-level imports + from apps.grpo.main import main + + logger.info("Starting main training loop...") + # Run the main training loop + # This should run for exactly 3 steps and then exit cleanly + await main(cfg) + + logger.info("Main training loop completed successfully") + logger.info("GRPO smoke test completed successfully!") + + except Exception as e: + logger.error(f"GRPO smoke test failed with error: {e}") + raise + finally: + # Cleanup + logger.info("Cleaning up test checkpoint directory...") + _cleanup_checkpoint_dir() + logger.info("Cleanup complete") + logger.info("=" * 80) diff --git a/tests/integration_tests/fixtures/grpo_smoke_test.yaml b/tests/integration_tests/fixtures/grpo_smoke_test.yaml new file mode 100644 index 000000000..35da38743 --- /dev/null +++ b/tests/integration_tests/fixtures/grpo_smoke_test.yaml @@ -0,0 +1,147 @@ +# Minimal GRPO configuration for integration testing +# This config is designed to run quickly with minimal resources + +# Global configuration +group_size: 2 # Minimal group size +local_batch_size: 2 # Minimal batch size +max_req_tokens: 128 # Reduced from 1024 +max_res_tokens: 128 # Reduced from 1024 +model: "Qwen/Qwen3-1.7B" +off_by_n: 1 + +# Main loop configuration +rollout_threads: 1 +training_threads: 1 + +# Observability configuration - console only for tests +metric_logging: + console: + logging_mode: global_reduce + +# Dataset configuration - streaming with limited samples +dataset: + path: "openai/gsm8k" + revision: "main" + data_split: "train" + streaming: true # Required by DatasetActor.sample() + model: ${model} + +# Policy configuration +policy: + engine_args: + model: ${model} + tensor_parallel_size: 1 + pipeline_parallel_size: 1 + enforce_eager: true # Eager mode for simpler testing + sampling_params: + n: ${group_size} + max_tokens: ${max_res_tokens} + temperature: 1.0 + top_p: 1.0 + +# Trainer configuration +trainer: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: hf://${model} + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${local_batch_size} + seq_len: ${sum:${max_req_tokens},${max_res_tokens}} + max_norm: 1.0 + steps: 1 + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + folder: /tmp/grpo_test_checkpoint + initial_load_path: hf://${model} + initial_load_in_hf: true + last_save_in_hf: false # Don't save back to HF in tests + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op + +# Replay buffer configuration +replay_buffer: + batch_size: ${local_batch_size} + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} + +# Reference model configuration +ref_model: + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: hf://${model} + training: + seq_len: ${trainer.training.seq_len} + dtype: bfloat16 + gc_freq: 1 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + enable: true + initial_load_path: hf://${model} + initial_load_in_hf: true + +# All resource allocations +services: + policy: + procs: ${policy.engine_args.tensor_parallel_size} + num_replicas: 1 + mesh_name: policy + with_gpus: true + ref_model: + procs: 1 + num_replicas: 1 + mesh_name: ref_model + with_gpus: true + reward_actor: + procs: 1 + num_replicas: 1 + mesh_name: reward_actor + with_gpus: false + +actors: + dataset: + procs: 1 + with_gpus: false + mesh_name: dataset + trainer: + procs: 1 + with_gpus: true + mesh_name: trainer + replay_buffer: + procs: 1 + with_gpus: false + mesh_name: replay_buffer + compute_advantages: + procs: 1 + with_gpus: false + mesh_name: compute_advantages diff --git a/tests/integration_tests/test_coder.py b/tests/integration_tests/test_coder.py deleted file mode 100644 index 45a80ec4d..000000000 --- a/tests/integration_tests/test_coder.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -""" -Integration tests for forge.actors.coder.SandboxedPythonCoder. - -Requires enroot to be installed. - -""" - -import os -import uuid - -import pytest - -from forge.actors.coder import SandboxedPythonCoder - - -@pytest.mark.timeout(30) -@pytest.mark.asyncio -async def test_coder_runs_python(): - """Integration test for SandboxedPythonCoder with real container execution.""" - # Create unique names to avoid test conflicts - unique_id = str(uuid.uuid1()) - container_name = f"test_sandbox_{unique_id}" - image_path = f"/tmp/python_test_{unique_id}.sqsh" - - coder = None - try: - coder = await SandboxedPythonCoder.as_actor( - docker_image="docker://python:3.10", - sqsh_image_path=image_path, - container_name=container_name, - ) - - # Execute code - results, _ = await coder.execute.call_one( - code="print('hello world')", - ) - print("Got results", results) - assert results == "hello world\n" - - finally: - # Clean up resources - if coder: - await SandboxedPythonCoder.shutdown(coder) - - # Clean up the image file - if os.path.exists(image_path): - os.unlink(image_path) - - -@pytest.mark.timeout(30) -@pytest.mark.asyncio -async def test_coder_catches_error(): - """Integration test for SandboxedPythonCoder with real container execution.""" - # Create unique names to avoid test conflicts - unique_id = str(uuid.uuid1()) - container_name = f"test_sandbox_{unique_id}" - image_path = f"/tmp/python_test_{unique_id}.sqsh" - - coder = None - try: - print("starting test") - coder = await SandboxedPythonCoder.as_actor( - docker_image="docker://python:3.10", - sqsh_image_path=image_path, - container_name=container_name, - ) - print("Got coder") - - # Execute code - _, stderr = await coder.execute.call_one( - code="hello world", - ) - print("got stderr", stderr) - assert "SyntaxError" in stderr - - finally: - # Clean up resources - if coder: - await SandboxedPythonCoder.shutdown(coder) - - # Clean up the image file - if os.path.exists(image_path): - os.unlink(image_path) diff --git a/tests/integration_tests/test_grpo_e2e.py b/tests/integration_tests/test_grpo_e2e.py new file mode 100644 index 000000000..6e35e21ce --- /dev/null +++ b/tests/integration_tests/test_grpo_e2e.py @@ -0,0 +1,124 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +"""A simple smoke test that runs the GRPO loop for 3 steps. + +Run this with: +PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test + + +""" + +import logging +import shutil +from pathlib import Path + +import monarch +import monarch.actor + +import pytest +import torch + +from forge.util.config import resolve_hf_hub_paths +from omegaconf import DictConfig, OmegaConf + +# Temporary workaround - without this, proc_mesh.stop +# will raise an exit code 1 failing all other tests. +monarch.actor.unhandled_fault_hook = lambda failure: None + + +logger: logging.Logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +requires_cuda = pytest.mark.skipif( + not torch.cuda.is_available(), + reason="CUDA not available", +) + +TEST_CHECKPOINT_DIR = "/tmp/grpo_test_checkpoint" + + +def _load_config(config_path: str) -> DictConfig: + """Load and resolve config from YAML file.""" + cfg = None + try: + cfg = OmegaConf.load(config_path) + except Exception as e: + pytest.fail(f"Failed to load config file {config_path}: {e}") + + assert isinstance(cfg, DictConfig) + cfg = resolve_hf_hub_paths(cfg) + return cfg + + +def _cleanup_checkpoint_dir(): + """Clean up test checkpoint directory.""" + path = Path(TEST_CHECKPOINT_DIR) + if path.exists() and path.is_dir(): + try: + shutil.rmtree(path) + logger.info(f"Successfully removed {TEST_CHECKPOINT_DIR}") + except Exception as e: + logger.error(f"Failed to remove {TEST_CHECKPOINT_DIR}: {e}") + + +class TestGRPOEndToEnd: + """End-to-end integration tests for GRPO training loop.""" + + @pytest.mark.asyncio + @pytest.mark.timeout(600) # 10 minute timeout to prevent hanging + @requires_cuda + async def test_grpo_smoke_test(self): + """ + Smoke test for GRPO training loop. + + This test runs the full GRPO pipeline for 3 training steps to verify: + - All actors and services initialize correctly + - Rollout loop generates completions + - Rewards are evaluated + - Reference model computes logprobs + - Replay buffer collects and batches experiences + - Trainer updates weights + - Policy receives weight updates + - Training completes successfully + """ + logger.info("=" * 80) + logger.info("Starting GRPO smoke test") + logger.info("=" * 80) + + try: + # Load test config + config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml" + cfg = _load_config(config_path) + + logger.info("Starting GRPO smoke test with config:") + logger.info(f" Model: {cfg.model}") + logger.info(f" Group size: {cfg.group_size}") + logger.info(f" Batch size: {cfg.local_batch_size}") + logger.info(f" Training steps: {cfg.trainer.training.steps}") + logger.info( + f" Max req/res tokens: {cfg.max_req_tokens}/{cfg.max_res_tokens}" + ) + + # Import main here to avoid issues with module-level imports + from apps.grpo.main import main + + logger.info("Starting main training loop...") + # Run the main training loop + # This should run for exactly 3 steps and then exit cleanly + await main(cfg) + + logger.info("Main training loop completed successfully") + logger.info("GRPO smoke test completed successfully!") + + except Exception as e: + logger.error(f"GRPO smoke test failed with error: {e}") + raise + finally: + # Cleanup + logger.info("Cleaning up test checkpoint directory...") + _cleanup_checkpoint_dir() + logger.info("Cleanup complete") + logger.info("=" * 80) diff --git a/tests/integration_tests/test_policy_update.py b/tests/integration_tests/test_policy_update.py index d4151b5b6..804d22f23 100644 --- a/tests/integration_tests/test_policy_update.py +++ b/tests/integration_tests/test_policy_update.py @@ -152,9 +152,9 @@ async def _setup_and_teardown(request): # ---- setup ---- # config_path = request.config.getoption("--config", default=None) if not config_path: - pytest.skip( - "No config file provided. Use --config to specify a YAML config file" - ) + # Use default config if none provided + config_path = "tests/integration_tests/fixtures/qwen3_1_7b_no_tp.yaml" + logger.info(f"No config provided, using default: {config_path}") use_dcp_override = request.config.getoption("--use_dcp") cfg = _load_config(config_path=config_path) @@ -257,7 +257,7 @@ async def test_sanity_check(self, _setup_and_teardown): await policy.save_model_params.fanout() # Sanity check that before update all the tests pass - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_unchanged ) for errs in all_errs: @@ -265,7 +265,7 @@ async def test_sanity_check(self, _setup_and_teardown): assert not e, f"Validation failed with exception: {e}" await policy.update_weights.fanout(version=v1) - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_all_zeros ) for errs in all_errs: @@ -274,7 +274,7 @@ async def test_sanity_check(self, _setup_and_teardown): # Reloading v0, getting back original weights await policy.update_weights.fanout(version=v0) - all_errs = await policy.validate_model_params.fanout( + all_errs = await policy._test_validate_model_params.fanout( _test_validate_params_unchanged ) for errs in all_errs: