diff --git a/.github/workflows/docker/docker-compose.yaml b/.github/workflows/docker/docker-compose.yaml new file mode 100644 index 0000000000..e04a722aeb --- /dev/null +++ b/.github/workflows/docker/docker-compose.yaml @@ -0,0 +1,59 @@ +services: + trinity-node-1: + image: trinity-rft:latest-unittest + pull_policy: never + command: sh -c "pip install -e .[dev] && ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block" + environment: + - HF_ENDPOINT=https://hf-mirror.com + - RAY_ADDRESS=auto + - CHECKPOINT_ROOT_DIR=/mnt/checkpoints + - DATA_ROOT_DIR=/mnt/data + - MODEL_PATH=/mnt/checkpoints/Qwen2.5-1.5B-Instruct + working_dir: /workspace + networks: + - trinity-network + volumes: + - trinity-volume:/mnt + - ../../..:/workspace + shm_size: "64G" + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['4', '5'] + capabilities: [gpu] + + trinity-node-2: + image: trinity-rft:latest-unittest + pull_policy: never + command: sh -c "pip install -e .[dev] && ray start --address=trinity-node-1:6379 --block" + environment: + - HF_ENDPOINT=https://hf-mirror.com + - CHECKPOINT_ROOT_DIR=/mnt/checkpoints + - DATA_ROOT_DIR=/mnt/data + - MODEL_PATH=/mnt/checkpoints/Qwen2.5-1.5B-Instruct + working_dir: /workspace + volumes: + - trinity-volume:/mnt + - ../../..:/workspace + depends_on: + - trinity-node-1 + networks: + - trinity-network + shm_size: "64G" + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['6', '7'] + capabilities: [gpu] + +networks: + trinity-network: + driver: bridge + +volumes: + trinity-volume: + external: true diff --git a/.github/workflows/unittest.yaml b/.github/workflows/unittest.yaml new file mode 100644 index 0000000000..12e9165f42 --- /dev/null +++ b/.github/workflows/unittest.yaml @@ -0,0 +1,44 @@ +name: unittest + +on: + issue_comment: + types: [created] + +permissions: + contents: read + +jobs: + unittest: + # only run on pull request + if: ${{ github.event.issue.pull_request && startsWith(github.event.comment.body, '/run-unittest') && github.event.comment.author_association == 'COLLABORATOR' }} + runs-on: self-hosted + + steps: + - uses: actions/checkout@v3 + with: + path: trinity-${{ github.run_id }} + fetch-depth: 0 + + - name: Setup docker compose + working-directory: trinity-${{ github.run_id }}/.github/workflows/docker + run: | + docker compose up -d + + - name: Run unittest + working-directory: trinity-${{ github.run_id }}/.github/workflows/docker + run: | + docker compose exec trinity-node-1 pytest tests --ignore=tests/data --junitxml=pytest.xml + continue-on-error: true + + - name: Upload test results + uses: actions/upload-artifact@v2 + with: + name: pytest-results + path: trinity-${{ github.run_id }}/pytest.xml + + - name: Pytest coverage comment + uses: MishaKav/pytest-coverage-comment@main + with: + junitxml-title: Unittest Result Summary + junitxml-path: trinity-${{ github.run_id }}/pytest.xml +# TODO: run data tests after the dependency conflict is resolved diff --git a/docs/sphinx_doc/source/tutorial/trinity_configs.md b/docs/sphinx_doc/source/tutorial/trinity_configs.md index 0c84e72b46..0ef8e93db5 100644 --- a/docs/sphinx_doc/source/tutorial/trinity_configs.md +++ b/docs/sphinx_doc/source/tutorial/trinity_configs.md @@ -307,7 +307,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 8 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/dpo_humanlike/train_dpo.yaml b/examples/dpo_humanlike/train_dpo.yaml index 477efcbb9f..ae7689106b 100644 --- a/examples/dpo_humanlike/train_dpo.yaml +++ b/examples/dpo_humanlike/train_dpo.yaml @@ -118,7 +118,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 1 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/grpo_alfworld/train_alfworld.yaml b/examples/grpo_alfworld/train_alfworld.yaml index c77686ac59..0a1c109754 100644 --- a/examples/grpo_alfworld/train_alfworld.yaml +++ b/examples/grpo_alfworld/train_alfworld.yaml @@ -117,7 +117,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 1 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 16384 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/grpo_gsm8k/train_gsm8k.yaml b/examples/grpo_gsm8k/train_gsm8k.yaml index 7b311bc4eb..eeb64dc746 100644 --- a/examples/grpo_gsm8k/train_gsm8k.yaml +++ b/examples/grpo_gsm8k/train_gsm8k.yaml @@ -122,7 +122,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 64 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/grpo_math/math.yaml b/examples/grpo_math/math.yaml index d7468c1cb7..ae7e40d9d0 100644 --- a/examples/grpo_math/math.yaml +++ b/examples/grpo_math/math.yaml @@ -18,7 +18,6 @@ model: max_prompt_tokens: 1024 max_response_tokens: 3072 checkpoint_path: /PATH/TO/CHECKPOINT/ - load_checkpoint: true cluster: node_num: 1 gpu_per_node: 8 diff --git a/examples/grpo_math/train_math.yaml b/examples/grpo_math/train_math.yaml index 0c457281ee..c49fda58ce 100644 --- a/examples/grpo_math/train_math.yaml +++ b/examples/grpo_math/train_math.yaml @@ -25,7 +25,6 @@ actor_rollout_ref: actor: strategy: fsdp # This is for backward-compatibility ppo_mini_batch_size: 128 - # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 4 use_dynamic_bsz: True # False ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} @@ -62,7 +61,6 @@ actor_rollout_ref: wrap_policy: # transformer_layer_cls_to_wrap: None min_num_params: 0 - # log_prob_micro_batch_size: 4 # will be deprecated, use log_prob_micro_batch_size_per_gpu log_prob_micro_batch_size_per_gpu: 16 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} @@ -86,7 +84,6 @@ actor_rollout_ref: max_num_batched_tokens: 8192 max_model_len: null max_num_seqs: 1024 - # log_prob_micro_batch_size: 8 # will be deprecated, use log_prob_micro_batch_size_per_gpu log_prob_micro_batch_size_per_gpu: 4 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} @@ -120,9 +117,7 @@ critic: min_num_params: 0 fsdp_size: -1 ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} - # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 64 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/grpo_sciworld/train_sciworld.yaml b/examples/grpo_sciworld/train_sciworld.yaml index a818dcb0c6..880fc61fcc 100644 --- a/examples/grpo_sciworld/train_sciworld.yaml +++ b/examples/grpo_sciworld/train_sciworld.yaml @@ -25,7 +25,6 @@ actor_rollout_ref: actor: strategy: fsdp # This is for backward-compatibility ppo_mini_batch_size: 1536 - # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 1 use_dynamic_bsz: False ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} @@ -57,7 +56,6 @@ actor_rollout_ref: wrap_policy: # transformer_layer_cls_to_wrap: None min_num_params: 0 - # log_prob_micro_batch_size: 4 # will be deprecated, use log_prob_micro_batch_size_per_gpu log_prob_micro_batch_size_per_gpu: 1 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} @@ -81,7 +79,6 @@ actor_rollout_ref: max_num_batched_tokens: 8192 max_model_len: null max_num_seqs: 1024 - # log_prob_micro_batch_size: 8 # will be deprecated, use log_prob_micro_batch_size_per_gpu log_prob_micro_batch_size_per_gpu: 1 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} @@ -115,9 +112,7 @@ critic: min_num_params: 0 fsdp_size: -1 ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} - # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 1 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 16384 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 @@ -140,7 +135,6 @@ reward_model: min_num_params: 0 param_offload: False fsdp_size: -1 - # micro_batch_size: null # will be deprecated, use micro_batch_size_per_gpu # micro_batch_size_per_gpu: 2 # set a number # max_length: null ulysses_sequence_parallel_size: 1 # sp size diff --git a/examples/grpo_webshop/train_webshop.yaml b/examples/grpo_webshop/train_webshop.yaml index bb61134cbe..aac06b7043 100644 --- a/examples/grpo_webshop/train_webshop.yaml +++ b/examples/grpo_webshop/train_webshop.yaml @@ -117,7 +117,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 1 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 16384 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/opmd_gsm8k/train_opmd_gsm8k.yaml b/examples/opmd_gsm8k/train_opmd_gsm8k.yaml index 28f3ffcef9..a82f4d0739 100644 --- a/examples/opmd_gsm8k/train_opmd_gsm8k.yaml +++ b/examples/opmd_gsm8k/train_opmd_gsm8k.yaml @@ -149,7 +149,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 64 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/examples/ppo_countdown/train_countdown.yaml b/examples/ppo_countdown/train_countdown.yaml index 58e608301d..53a1401c4b 100644 --- a/examples/ppo_countdown/train_countdown.yaml +++ b/examples/ppo_countdown/train_countdown.yaml @@ -124,7 +124,6 @@ critic: ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} # ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu ppo_micro_batch_size_per_gpu: 8 - forward_micro_batch_size: ${critic.ppo_micro_batch_size} forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 diff --git a/tests/buffer/__init__.py b/tests/buffer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/common/__init__.py b/tests/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/common/config_test.py b/tests/common/config_test.py index 3e210df993..2dceac2b96 100644 --- a/tests/common/config_test.py +++ b/tests/common/config_test.py @@ -11,6 +11,8 @@ class TestConfig(unittest.TestCase): def test_load_default_config(self): config = load_config(config_yaml_path) + print(config.data) + config.check_and_update() self.assertIsNotNone(config.trainer.trainer_config) self.assertEqual(config.trainer.trainer_config.trainer.n_gpus_per_node, 4) self.assertEqual(config.trainer.trainer_config.trainer.nnodes, 1) @@ -22,13 +24,15 @@ def test_load_default_config(self): config.synchronizer.sync_iteration_interval, ) - def test_all_examples_are_valid(self): # TODO: useless - example_dir = os.path.join(os.path.dirname(__file__), "..", "..", "scripts", "config") - for filename in ["countdown", "gsm8k"]: - if filename.endswith(".yaml"): - config_path = os.path.join(example_dir, filename) - try: - load_config(config_path) - except Exception as e: - print(f"Error loading config {config_path}: {e}") - raise e + def test_all_examples_are_valid(self): + example_dir = os.path.join(os.path.dirname(__file__), "..", "..", "examples") + for example_name in os.listdir(example_dir): + for filename in os.listdir(os.path.join(example_dir, example_name)): + if filename.endswith(".yaml") and not filename.startswith("train"): + print(f"Checking config: {filename}") + config_path = os.path.join(example_dir, example_name, filename) + try: + load_config(config_path) + except Exception as e: + print(f"Error loading config {config_path}: {e}") + raise e diff --git a/tests/manager/storage_test.py b/tests/common/experience_test.py similarity index 90% rename from tests/manager/storage_test.py rename to tests/common/experience_test.py index 1ce344b10e..11ad2d1d4e 100644 --- a/tests/manager/storage_test.py +++ b/tests/common/experience_test.py @@ -20,7 +20,7 @@ def test_experience_model_experience_conversion(self): tokens = torch.tensor([1, 2, 3], dtype=torch.int32) reward = 0.6 prompt_length = 2 - logprobs = torch.tensor([0.1], dtype=torch.float32) + logprobs = torch.tensor([0, 0, 0.1], dtype=torch.float32) action_mask = torch.tensor([1, 0, 1], dtype=torch.bool) experience = Experience( tokens=tokens, @@ -44,28 +44,28 @@ def test_batch_conversion(self): tokens=torch.tensor([1, 2]), prompt_length=1, reward=float(0.1), - logprobs=torch.tensor([0.1]), + logprobs=torch.tensor([0, 0.1]), action_mask=torch.tensor([1, 0]), ), Experience( tokens=torch.tensor([1, 2, 3]), prompt_length=2, reward=float(0.2), - logprobs=torch.tensor([0.1]), + logprobs=torch.tensor([0, 0, 0.1]), action_mask=torch.tensor([1, 0, 1]), ), Experience( tokens=torch.tensor([1, 2, 3, 4]), prompt_length=2, reward=float(0.3), - logprobs=torch.tensor([0.1, 0.2]), + logprobs=torch.tensor([0, 0, 0.1, 0.2]), action_mask=torch.tensor([1, 0, 1, 0]), ), Experience( tokens=torch.tensor([1, 2, 3, 4]), prompt_length=3, reward=float(0.4), - logprobs=torch.tensor([0.1]), + logprobs=torch.tensor([0, 0, 0, 0.1]), action_mask=torch.tensor([1, 0, 1, 0]), ), ] @@ -89,7 +89,8 @@ def test_batch_conversion(self): self.assertTrue( torch.all( batch.logprobs[i][ - prompt_length : prompt_length + prompt_length + - exps[i].prompt_length : prompt_length + exps[i].tokens.size(0) - exps[i].prompt_length ] diff --git a/tests/common/tmp/template_config.yaml b/tests/common/tmp/template_config.yaml index 8f7c20df06..2cae62acdf 100644 --- a/tests/common/tmp/template_config.yaml +++ b/tests/common/tmp/template_config.yaml @@ -2,27 +2,11 @@ mode: both data: dataset_path: '' total_epoch: 1 - batch_size: 1 + batch_size: 32 train_split: 'train' eval_split: '' default_workflow_type: '' default_reward_fn_type: '' - dataset_config: {} - format_config: - prompt_key: '' - response_key: '' - chat_template: '' - reward_fn_key: '' - workflow_key: '' - solution_key: '' - reward_key: '' - chosen_key: '' - rejected_key: '' - label_key: '' - dj_config_path: null - dj_process_desc: null - clean_strategy: iterative - min_size_ratio: null model: model_path: '' max_prompt_tokens: 2048 @@ -32,8 +16,6 @@ cluster: node_num: 1 gpu_per_node: 8 buffer: - storage_type: sql - db_url: '' read_batch_size: 32 max_retry_times: 3 max_retry_interval: 1 @@ -46,8 +28,8 @@ explorer: enable_prefix_caching: false enforce_eager: true dtype: bfloat16 - temperature: 0.0 - top_p: 1.0 + temperature: 0.2 + top_p: 0.95 top_k: -1 seed: 42 logprobs: 0 diff --git a/tests/vllm_test.py b/tests/common/vllm_test.py similarity index 58% rename from tests/vllm_test.py rename to tests/common/vllm_test.py index 6ca5ed0513..ab39cc5c66 100644 --- a/tests/vllm_test.py +++ b/tests/common/vllm_test.py @@ -13,7 +13,16 @@ tokenize_and_mask_messages_hf, ) -config_dir = os.path.join(os.path.dirname(__file__), "test_data", "template.yaml") +config_dir = os.path.join(os.path.dirname(__file__), "tmp", "template_config.yaml") + + +def get_model_path() -> str: + path = os.environ.get("MODEL_PATH") + if not path: + raise EnvironmentError( + "Please set `export MODEL_PATH=` before running this test." + ) + return path CHAT_TEMPLATE = r""" @@ -76,129 +85,11 @@ """ -@unittest.skip("Skip VLLM test") -class TestSyncvLLMModel(unittest.TestCase): - def setUp(self): - ray.init(ignore_reinit_error=True) - self.config = load_config(config_dir) - self.engines = create_rollout_models(self.config) - self.assertEqual(len(self.engines), self.config.explorer.engine_num) - - def test_generate(self): - prompts = [ - "Hello, world!", - "Hello, my name is", - ] - cluster_results = [] - for engine in self.engines: - cluster_results.extend(ray.get(engine.generate.remote(prompts))) - - self.assertEqual( - len(cluster_results), - len(self.engines) * len(prompts) * self.config.explorer.repeat_times, - ) - for i in range(len(self.engines)): - for j in range(len(prompts)): - for k in range(self.config.explorer.repeat_times): - self.assertEqual( - cluster_results[ - i * len(prompts) * self.config.explorer.repeat_times - + j * self.config.explorer.repeat_times - + k - ].prompt_text, - prompts[j], - ) - - def test_chat_and_logprobs(self): - messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello, world!"}, - ] - cluster_results = [] - for engine in self.engines: - cluster_results.extend(ray.get(engine.chat.remote(messages))) - self.assertEqual( - len(cluster_results), len(self.engines) * self.config.explorer.repeat_times - ) - for i in range(len(self.engines)): - for k in range(self.config.explorer.repeat_times): - self.assertIn( - "Hello, world!", - cluster_results[i * self.config.explorer.repeat_times + k].prompt_text, - ) - self.assertIn( - "You are a helpful assistant.", - cluster_results[i * self.config.explorer.repeat_times + k].prompt_text, - ) - logprobs = ray.get(self.engines[0].logprobs.remote(cluster_results[0].tokens)) - self.assertEqual(logprobs.shape[0], cluster_results[0][0].tokens.shape[0] - 1) - - -@unittest.skip("Skip VLLM test") -class TestAsyncvLLMModel(unittest.TestCase): - def setUp(self): - ray.init(ignore_reinit_error=True) - self.config = load_config(config_dir) - self.config.explorer.engine_type = "vllm_async" - self.engines = create_rollout_models(self.config) - self.assertEqual(len(self.engines), self.config.explorer.engine_num) - - def test_generate(self): - prompts = ["Hello, world!", "Hi, my name is", "How are you?", "What's up?"] - cluster_results = [] - refs = [] - for engine in self.engines: - for prompt in prompts: - refs.append(engine.generate_async.remote(prompt)) - cluster_results = ray.get(refs) - - self.assertEqual( - len(cluster_results), - len(self.engines) * self.config.explorer.repeat_times * len(prompts), - ) - - def test_chat_and_logprobs(self): - messages = [ - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello, world!"}, - ], - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Please tell me about yourself."}, - ], - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Please tell me a joke."}, - ], - ] - cluster_results = [] - refs = [] - for engine in self.engines: - for message in messages: - refs.append(engine.chat_async.remote(message)) - cluster_results = ray.get(refs) - - self.assertEqual( - len(cluster_results), - len(self.engines) * self.config.explorer.repeat_times * len(messages), - ) - logprobs_refs = [] - for i, messages in enumerate(messages): - token_ids = cluster_results[i][0].tokens.tolist() - logprobs_refs.append(self.engines[0].logprobs_async.remote(token_ids)) - logprobs = ray.get(logprobs_refs) - for i, messages in enumerate(messages): - self.assertEqual(logprobs[i].shape[0], cluster_results[i][0].tokens.shape[0]) - - -class TestModelWrapper: +class BaseTestModelWrapper: def test_generate(self): prompts = ["Hello, world!", "Hello, my name is"] results = self.model_wrapper.generate(prompts) self.assertEqual(len(results), len(prompts) * self.config.explorer.repeat_times) - - def test_chat_and_logprobs(self): messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What's the weather like today?"}, @@ -235,11 +126,11 @@ def test_chat_and_logprobs(self): self.assertTrue(torch.equal(result_dict["input_ids"][0], exp.tokens)) -# @unittest.skip("Skip VLLM test") -class TestModelWrapperSync(TestModelWrapper, unittest.TestCase): +class TestModelWrapperSync(BaseTestModelWrapper, unittest.TestCase): def setUp(self): ray.init(ignore_reinit_error=True) self.config = load_config(config_dir) + self.config.model.model_path = get_model_path() self.config.explorer.engine_type = "vllm" self.config.explorer.engine_num = 1 self.config.explorer.chat_template = CHAT_TEMPLATE @@ -247,11 +138,11 @@ def setUp(self): self.model_wrapper = ModelWrapper(self.engines[0], model_type="vllm") -# @unittest.skip("Skip VLLM test") -class TestModelWrapperAsync(TestModelWrapper, unittest.TestCase): +class TestModelWrapperAsync(BaseTestModelWrapper, unittest.TestCase): def setUp(self): ray.init(ignore_reinit_error=True) self.config = load_config(config_dir) + self.config.model.model_path = get_model_path() self.config.explorer.engine_type = "vllm_async" self.config.explorer.engine_num = 1 self.config.explorer.chat_template = CHAT_TEMPLATE @@ -274,7 +165,7 @@ def test_assistant_token_mask(self): "content": "You're welcome! If you have any other questions, feel free to ask.", }, ] - tokenizer = AutoTokenizer.from_pretrained("/nas/checkpoints/Qwen25-1.5B-instruct") + tokenizer = AutoTokenizer.from_pretrained(get_model_path()) token_ids, action_mask = tokenize_and_mask_messages_default( tokenizer=tokenizer, messages=messages, diff --git a/tests/explorer/__init__.py b/tests/explorer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/explorer/runner_pool_test.py b/tests/explorer/runner_pool_test.py index 1f5244423c..9f3597216a 100644 --- a/tests/explorer/runner_pool_test.py +++ b/tests/explorer/runner_pool_test.py @@ -6,13 +6,14 @@ import ray import torch -from trinity.common.config import load_config +from trinity.buffer.reader.queue_reader import QueueReader +from trinity.common.config import DatasetConfig, load_config +from trinity.common.constants import AlgorithmType, StorageType from trinity.common.experience import Experience from trinity.common.models.model import InferenceModel from trinity.common.task import Task from trinity.common.workflows.workflow import WORKFLOWS, Workflow from trinity.explorer.runner_pool import RunnerPool -from trinity.manager.queue_storage import QueueStorage config_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "test_data", "template.yaml") @@ -62,13 +63,16 @@ def setUp(self): ray.init(ignore_reinit_error=True) self.config = load_config(config_dir) self.config.explorer.runner_num = 2 - self.config.buffer.storage_type = "queue" - self.config.buffer.db_url = None self.config.explorer.max_retry_times = 0 self.config.explorer.max_timeout = 5 self.config.buffer.read_batch_size = 2 self.config.buffer.pad_token_id = 0 - self.queue = QueueStorage(self.config.buffer) + self.config.buffer.train_dataset = DatasetConfig( + name="test", + storage_type=StorageType.QUEUE, + algorithm_type=AlgorithmType.PPO, + ) + self.queue = QueueReader(self.config.buffer.train_dataset, self.config.buffer) def test_runner_pool(self): pool = RunnerPool(self.config, [DummyModel.remote(), DummyModel.remote()]) @@ -111,18 +115,18 @@ def test_runner_pool(self): self.assertTrue(et - st < 5) self.assertEqual(len(status), 1) self.assertFalse(status[0].ok) - # 2. `timeout_5 + # 2. `timeout_2 st = time.time() status = pool.get_next_unorder() et = time.time() - self.assertTrue(et - st < 5) + self.assertTrue(et - st < 3) self.assertEqual(len(status), 1) self.assertTrue(status[0].ok) # 3. `success` st = time.time() status = pool.get_next_unorder() et = time.time() - self.assertTrue(et - st < 5) + self.assertTrue(et - st < 1) self.assertEqual(len(status), 1) self.assertTrue(status[0].ok) # 4. `timeout_100`and `timeout_101` @@ -135,13 +139,10 @@ def test_runner_pool(self): self.assertFalse(status[1].ok) # 5.`exit` - st = time.time() status = pool.get_next_unorder() - et = time.time() - self.assertTrue(et - st < 5) self.assertEqual(len(status), 1) self.assertFalse(status[0].ok) - exps = self.queue.get_experiences() - self.assertEqual(exps.batch_size, 2) # `timeout_2` and `success` + exps = self.queue.read() + self.assertEqual(len(exps), 2) # `timeout_2` and `success` self.assertEqual(len(pool._idle_actors), self.config.explorer.runner_num) diff --git a/tests/explorer/workflow_test.py b/tests/explorer/workflow_test.py index 62eac4412c..e27c75a95d 100644 --- a/tests/explorer/workflow_test.py +++ b/tests/explorer/workflow_test.py @@ -2,7 +2,7 @@ """Test for the workflow module""" import unittest from dataclasses import dataclass -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock from trinity.common.workflows import MathWorkflow @@ -14,9 +14,9 @@ class MockResponse: class WorkflowTest(unittest.TestCase): - @patch("ray.get") - def test_math_workflow(self, mock_ray_get) -> None: - mock_ray_get.return_value = [ + def test_math_workflow(self) -> None: + model = MagicMock() + model.chat.return_value = [ MockResponse(r"\boxed{2}"), MockResponse(r"\boxted{3}"), MockResponse(r"2"), @@ -27,23 +27,23 @@ def test_math_workflow(self, mock_ray_get) -> None: MockResponse("\nOnly thinking\n"), MockResponse("ThinkingAnswer is not end1"), ] - model = MagicMock() workflow = MathWorkflow(model=model, task_desc="1+1=", truth="2") experiences = workflow.run() + print(experiences) self.assertEqual(len(experiences), 9) - self.assertEqual(experiences[0].reward, 1.0) - self.assertEqual(experiences[1].reward, 0.0) - self.assertEqual(experiences[2].reward, 0.0) - self.assertEqual(experiences[3].reward, 1.0) - self.assertEqual(experiences[4].reward, 2.0) - self.assertEqual(experiences[5].reward, 1.0) - self.assertEqual(experiences[6].reward, 0.0) - self.assertEqual(experiences[7].reward, 0.0) - self.assertEqual(experiences[8].reward, 0.0) + self.assertEqual(experiences[0].reward, 0.9) + self.assertEqual(experiences[1].reward, -0.1) + self.assertEqual(experiences[2].reward, 0.9) + self.assertEqual(experiences[3].reward, 0.1) + self.assertEqual(experiences[4].reward, 1.1) + self.assertEqual(experiences[5].reward, 0.9) + self.assertEqual(experiences[6].reward, -0.1) + self.assertEqual(experiences[7].reward, -0.1) + self.assertEqual(experiences[8].reward, -0.1) - @patch("ray.get") - def test_math_fraction_workflow(self, mock_ray_get) -> None: - mock_ray_get.return_value = [ + def test_math_fraction_workflow(self) -> None: + model = MagicMock() + model.chat.return_value = [ MockResponse(r"\boxed{\frac{40}{400}}"), MockResponse(r"\boxed{\frac{1}{10}}"), MockResponse(r"\boxed{0.1}"), @@ -51,25 +51,23 @@ def test_math_fraction_workflow(self, mock_ray_get) -> None: MockResponse(r"\boxed{\frac{1} {10}}"), MockResponse(r"The answer is \boxed{\frac{40}{400}}"), ] - model = MagicMock() workflow = MathWorkflow(model=model, task_desc=r"\frac{40}{400}", truth=r"\frac{40}{400}") experiences = workflow.run() self.assertEqual(len(experiences), 6) - self.assertEqual(experiences[0].reward, 1.0) - self.assertEqual(experiences[1].reward, 1.0) - self.assertEqual(experiences[2].reward, 1.0) - self.assertEqual(experiences[3].reward, 1.0) - self.assertEqual(experiences[4].reward, 1.0) - self.assertEqual(experiences[5].reward, 1.0) + self.assertEqual(experiences[0].reward, 0.9) + self.assertEqual(experiences[1].reward, 0.9) + self.assertEqual(experiences[2].reward, 0.9) + self.assertEqual(experiences[3].reward, 0.9) + self.assertEqual(experiences[4].reward, 0.9) + self.assertEqual(experiences[5].reward, 0.9) - @patch("ray.get") - def test_math_complex_workflow(self, mock_ray_get) -> None: - mock_ray_get.return_value = [ + def test_math_complex_workflow(self) -> None: + model = MagicMock() + model.chat.return_value = [ MockResponse( r"$\boxed{\dfrac{108 + 31\sqrt{5}}{216}} \quad \text{and} \quad \boxed{\dfrac{108 - 31\sqrt{5}}{216}}$" ), ] - model = MagicMock() workflow = MathWorkflow( model=model, task_desc="", @@ -77,16 +75,15 @@ def test_math_complex_workflow(self, mock_ray_get) -> None: ) experiences = workflow.run() self.assertEqual(len(experiences), 1) - self.assertEqual(experiences[0].reward, 1.0) + self.assertEqual(experiences[0].reward, 0.9) - @patch("ray.get") - def test_gsm8k_workflow(self, mock_ray_get) -> None: - mock_ray_get.return_value = [ + def test_gsm8k_workflow(self) -> None: + model = MagicMock() + model.chat.return_value = [ MockResponse(" balabalabala 99 \n 36 "), MockResponse(" 36.0 "), MockResponse("Kim's total points are 6 + 30 = 36 "), ] - model = MagicMock() workflow = MathWorkflow( model=model, task_desc="", diff --git a/tests/test_data/template.yaml b/tests/test_data/template.yaml index 7ae0b3fbce..4a66d9e911 100644 --- a/tests/test_data/template.yaml +++ b/tests/test_data/template.yaml @@ -1,7 +1,7 @@ data: dataset_path: '' + batch_size: 32 model: - model_path: '/nas/checkpoints/Qwen25-7B-Instruct' max_prompt_tokens: 2048 max_response_tokens: 2048 checkpoint_path: '' @@ -9,8 +9,6 @@ cluster: node_num: 1 gpu_per_node: 8 buffer: - storage_type: dict - db_url: '' read_batch_size: 32 max_retry_times: 3 max_retry_interval: 1 diff --git a/trinity/common/verl_config.py b/trinity/common/verl_config.py index 4e08958337..4e85fdb5bc 100644 --- a/trinity/common/verl_config.py +++ b/trinity/common/verl_config.py @@ -68,8 +68,7 @@ class Checkpoint: class Actor: strategy: str = "fsdp" ppo_mini_batch_size: int = 256 - ppo_micro_batch_size: Optional[int] = None - ppo_micro_batch_size_per_gpu: Optional[int] = None + ppo_micro_batch_size_per_gpu: int = 1 use_dynamic_bsz: bool = False ppo_max_token_len_per_gpu: int = ( 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} @@ -95,8 +94,7 @@ class Actor: @dataclass class Ref: fsdp_config: FSDPConfig = field(default_factory=FSDPConfig) - log_prob_micro_batch_size: Optional[int] = None - log_prob_micro_batch_size_per_gpu: Optional[int] = None + log_prob_micro_batch_size_per_gpu: int = 1 log_prob_use_dynamic_bsz: bool = False log_prob_max_token_len_per_gpu: int = 0 ulysses_sequence_parallel_size: int = 1 @@ -121,8 +119,7 @@ class Rollout: max_num_batched_tokens: int = 8192 max_model_len: Optional[int] = None max_num_seqs: int = 1024 - log_prob_micro_batch_size: Optional[int] = None - log_prob_micro_batch_size_per_gpu: Optional[int] = None + log_prob_micro_batch_size_per_gpu: int = 1 log_prob_use_dynamic_bsz: bool = False log_prob_max_token_len_per_gpu: int = 0 disable_log_stats: bool = True @@ -158,8 +155,7 @@ class Critic: optim: Optim = field(default_factory=Optim) model: CriticModel = field(default_factory=CriticModel) ppo_mini_batch_size: int = 0 - ppo_micro_batch_size: Optional[int] = None - ppo_micro_batch_size_per_gpu: Optional[int] = None + ppo_micro_batch_size_per_gpu: int = 1 forward_micro_batch_size: Optional[int] = None forward_micro_batch_size_per_gpu: Optional[int] = None use_dynamic_bsz: bool = False @@ -187,8 +183,7 @@ class RewardModel: enable: bool = False strategy: str = "fsdp" model: _RewardModel = field(default_factory=_RewardModel) - micro_batch_size: Optional[int] = None - micro_batch_size_per_gpu: Optional[int] = None + micro_batch_size_per_gpu: int = 1 max_length: Optional[int] = None ulysses_sequence_parallel_size: int = 1 use_dynamic_bsz: bool = False @@ -323,26 +318,6 @@ def synchronize_config(self, config: Config) -> None: f"batch_size_per_gpu ({batch_size_per_gpu}) must be divisible by " f"critic.ppo_micro_batch_size_per_gpu ({self.critic.ppo_micro_batch_size_per_gpu})" ) - if self.actor_rollout_ref.actor.ppo_micro_batch_size is not None: - raise ValueError( - "`actor_rollout_ref.actor.ppo_micro_batch_size` will be deprecated, " - "please use `actor_rollout_ref.`actor.ppo_micro_batch_size_per_gpu` instead." - ) - if self.actor_rollout_ref.ref.log_prob_micro_batch_size is not None: - raise ValueError( - "`actor_rollout_ref.ref.log_prob_micro_batch_size` will be deprecated," - "please use `actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu` instead." - ) - if self.critic.ppo_micro_batch_size is not None: - raise ValueError( - "`critic.ppo_micro_batch_size` will be deprecated, " - "please use `critic.ppo_micro_batch_size_per_gpu` instead." - ) - if self.reward_model.enable and self.reward_model.micro_batch_size is not None: - raise ValueError( - "`reward_model.micro_batch_size` will be deprecated, " - "please use `reward_model.micro_batch_size_per_gpu` instead." - ) # TODO: check other fields self.enable_preview = config.trainer.enable_preview diff --git a/trinity/explorer/explorer.py b/trinity/explorer/explorer.py index 6beac2b049..84c7bfca30 100644 --- a/trinity/explorer/explorer.py +++ b/trinity/explorer/explorer.py @@ -214,7 +214,7 @@ def explore_step(self) -> Tuple[bool, int]: self.logger.info("Explore step finished.") return True, self.iteration - def eval(self, step) -> bool: + def eval(self) -> bool: """Evaluation on all evaluation data samples.""" self.logger.info("Evaluation started.") st = time.time() @@ -237,7 +237,7 @@ def eval(self, step) -> bool: log_metrics = self.monitor.calculate_metrics(all_metrics, prefix="eval") # type: ignore log_metrics["eval/total_time"] = time.time() - st - self.monitor.log(log_metrics, step=step) # type: ignore + self.monitor.log(log_metrics, step=self.iteration) # type: ignore return True def sync_weight(self) -> None: diff --git a/trinity/explorer/workflow_runner.py b/trinity/explorer/workflow_runner.py index 59a90d7f62..b2c0c11804 100644 --- a/trinity/explorer/workflow_runner.py +++ b/trinity/explorer/workflow_runner.py @@ -8,7 +8,6 @@ from typing import List, Optional import ray -from transformers import AutoTokenizer from trinity.buffer import get_buffer_writer from trinity.common.config import Config @@ -38,7 +37,6 @@ def __init__(self, config: Config, model: InferenceModel) -> None: self.config.buffer.train_dataset, # type: ignore self.config.buffer, ) - self.tokenizer = AutoTokenizer.from_pretrained(config.model.model_path) self.model = model self.model_wrapper = ModelWrapper(model, config.explorer.engine_type) self.logger = get_logger(__name__)