Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sphinx_doc/source/tutorial/example_tinker_backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,4 @@ synchronizer:

Since Llama-3.2-3B is a base (non-instruct-tuned) model, it has limited ability to follow formatting instructions. Additionally, we trained for only **one epoch**. As a result, both backends achieved final rewards just slightly above 0.1. Nonetheless, the training curves show a clear upward trend in reward, indicating successful learning. The results are visualized below:

![Training Rewards on GSM8K](../../docs/sphinx_doc/assets/tinker-gsm8k.png)
![Training Rewards on GSM8K](../../assets/tinker-gsm8k.png)
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,4 @@ synchronizer:

由于 Llama-3.2-3B 是基础(非指令微调)模型,其格式化指令跟随能力有限,且本实验仅训练了**一个 epoch**。因此,两种后端的最终 reward 都略高于 0.1。但训练曲线显示 reward 呈明显上升趋势,表明模型已成功学习。结果可视化如下:

![GSM8K 训练奖励曲线](../../docs/sphinx_doc/assets/tinker-gsm8k.png)
![GSM8K 训练奖励曲线](../../assets/tinker-gsm8k.png)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ megatron = [
"mbridge>=0.13.0",
]
tinker = [
"tinker", # tinker requires python>=3.11
"tinker; python_version >= '3.11'",
]

doc = [
Expand Down
3 changes: 2 additions & 1 deletion scripts/docker/Dockerfile.uv
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
#
# Note:
# 1. This Dockerfile uses 'uv' to create a virtual environment for better package management. If you want a simpler setup without 'uv', please refer to `scripts/docker/Dockerfile`.
# 2. Make sure to use `uv pip` to install packages within the virtual environment.
# 2. The uv virtual environment is created at `/opt/venv`, use `source /opt/venv/bin/activate` to activate it.
# 3. Make sure to use `uv pip` to install packages within the virtual environment.

FROM nvcr.io/nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04

Expand Down
6 changes: 5 additions & 1 deletion tests/buffer/file_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def setUp(self):
name="test_buffer", storage_type=StorageType.FILE.value
)
self.config.check_and_update()
ray.init(ignore_reinit_error=True, runtime_env={"env_vars": self.config.get_envs()})
ray.init(
ignore_reinit_error=True,
runtime_env={"env_vars": self.config.get_envs()},
namespace="trinity_unittest",
)
os.makedirs(self.config.buffer.cache_dir, exist_ok=True)
file_path = self.config.buffer.trainer_input.experience_buffer.path
if os.path.exists(file_path):
Expand Down
1 change: 0 additions & 1 deletion tests/common/vllm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ async def test_generate(self):
await prepare_engines(self.engines, self.auxiliary_engines)
await self.model_wrapper.prepare()
self.assertEqual(self.model_wrapper.model_path, self.config.model.model_path)
self.assertEqual(await self.model_wrapper.model_path_async, self.config.model.model_path)
prompts = ["Hello, world!", "Hello, my name is"]
n = self.config.algorithm.repeat_times
if self.use_async:
Expand Down
5 changes: 5 additions & 0 deletions tests/explorer/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ async def run_async(self) -> List[Experience]:

@ray.remote
class DummyModel(InferenceModel):
def __init__(self):
from trinity.common.config import InferenceModelConfig

super().__init__(InferenceModelConfig(model_path="dummy_model"))

def sync_model(self, model_version, update_weight_args_list):
return True

Expand Down
59 changes: 58 additions & 1 deletion tests/explorer/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from tests.common.vllm_test import CHAT_TEMPLATE
from tests.tools import get_model_path, get_template_config, get_unittest_dataset_config
from trinity.common.config import InferenceModelConfig
from trinity.common.experience import EID, Experience
from trinity.common.models import create_inference_models
from trinity.common.models.model import ModelWrapper
Expand Down Expand Up @@ -551,7 +552,7 @@ async def monitor_routine():


class TestAgentScopeWorkflowAdapter(unittest.IsolatedAsyncioTestCase):
async def test_adapter(self):
async def test_adapter_v0(self):
try:
from agentscope.model import TrinityChatModel
except ImportError:
Expand Down Expand Up @@ -586,6 +587,58 @@ async def as_workflow_func(task, model) -> float:
self.assertEqual(result[1].reward, 0.1)
self.assertEqual(result[1].prompt_length, 2)

async def test_adapter_v1(self):
try:
from agentscope.model import ChatModelBase
from agentscope.tuner import JudgeOutput, WorkflowOutput
except ImportError:
self.skipTest("agentscope >= 1.0.12 is not installed")

async def as_workflow_func(task, model) -> WorkflowOutput:
self.assertIsInstance(task, dict)
self.assertIsInstance(model, ChatModelBase)
return WorkflowOutput(
reward=task["reward"],
response=task["reward"],
metrics={"workflow_metric_1": 0.0},
)

async def as_judge_func(task, response) -> JudgeOutput:
self.assertIsInstance(task, dict)
self.assertIsInstance(response, float)
return JudgeOutput(
reward=response,
metrics={"judge_metric_1": 1.0},
)

model = MagicMock()
openai_client = MagicMock()
openai_client.model_path = "Qwen/Qwen3-8B"
model.get_openai_async_client.return_value = openai_client
model.extract_experience_from_history.return_value = [
Experience(tokens=Tensor([0, 1, 2]), prompt_length=1, logprobs=Tensor([0.1, 0.2])),
]

as_adapter_cls = WORKFLOWS.get("agentscope_workflow_adapter_v1")
as_adapter = as_adapter_cls(
task=Task(
raw_task={"reward": 0.2},
workflow_args={
"workflow_func": as_workflow_func,
"judge_func": as_judge_func,
},
),
model=model,
)
result = await as_adapter.run_async()
self.assertEqual(len(result), 1)
self.assertEqual(result[0].reward, 0.2)
self.assertEqual(result[0].prompt_length, 1)
metrics = result[-1].metrics
self.assertEqual(len(metrics), 2)
self.assertEqual(metrics["workflow_metric_1"], 0.0)
self.assertEqual(metrics["judge_metric_1"], 1.0)


class DummyModelWrapper:
def __init__(self, model, engine_type="vllm", **kwargs):
Expand Down Expand Up @@ -678,9 +731,13 @@ async def mock_get_api_server_url_remote():
async def mock_get_model_version_remote():
return 1

async def mock_get_model_config_remote():
return InferenceModelConfig(model_path="dummy_model")

model = MagicMock()
model.get_api_server_url.remote = MagicMock(side_effect=mock_get_api_server_url_remote)
model.get_model_version.remote = MagicMock(side_effect=mock_get_model_version_remote)
model.get_model_config.remote = MagicMock(side_effect=mock_get_model_config_remote)

runner = WorkflowRunner(
config,
Expand Down
153 changes: 150 additions & 3 deletions tests/trainer/trainer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import unittest
from copy import deepcopy
from datetime import datetime
from typing import Dict
from unittest import mock

import ray
Expand Down Expand Up @@ -1134,10 +1135,10 @@ async def test_serve_with_trainer(self): # noqa: C901
+ metrics["rollout/model_1/total_request_count"],
metrics["rollout/total_experience_count"],
)
# at least updated to version 2
# at least updated to version 1
await asyncio.sleep(5) # wait for model version update
self.assertGreaterEqual(metrics["rollout/model_0/model_version"], 2)
self.assertGreaterEqual(metrics["rollout/model_1/model_version"], 2)
self.assertGreaterEqual(metrics["rollout/model_0/model_version"], 1)
self.assertGreaterEqual(metrics["rollout/model_1/model_version"], 1)
# check final checkpoint
_, step_num = get_checkpoint_dir_with_step_num(
checkpoint_root_path=serve_config.checkpoint_job_dir,
Expand Down Expand Up @@ -1433,3 +1434,149 @@ def test_trainer(self):
def tearDown(self):
# remove dir only when the test passed
shutil.rmtree(self.config.checkpoint_job_dir, ignore_errors=True)


@unittest.skip("Require agentscope >= 1.0.12")
class AgentScopeTunerTest(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
ray.init(ignore_reinit_error=True)

def tearDown(self) -> None:
ray.shutdown(_exiting_interpreter=True)

def test_agentscope_tuner(self):
try:
from agentscope.agent import ReActAgent
from agentscope.formatter import OpenAIChatFormatter
from agentscope.message import Msg
from agentscope.model import ChatModelBase
from agentscope.tuner import (
Algorithm,
Dataset,
JudgeOutput,
TunerChatModel,
WorkflowOutput,
tune,
)
except ImportError:
self.skipTest("agentscope >= 1.0.12 is not installed")

async def workflow_func(
task: Dict,
model: ChatModelBase,
auxiliary_models: Dict[str, ChatModelBase],
) -> WorkflowOutput:
assert isinstance(model, ChatModelBase)
assert "judge_model" in auxiliary_models
assert isinstance(auxiliary_models["judge_model"], ChatModelBase)
agent = ReActAgent(
name="test_agent",
model=model,
sys_prompt="You are a helpful assistant.",
formatter=OpenAIChatFormatter(),
)
st = time.time()
response = await agent.reply(Msg("user", task["question"], role="user"))
et = time.time()
return WorkflowOutput(response=response, metrics={"workflow_time": et - st})

async def judge_func(
task: Dict, response: Msg, auxiliary_models: Dict[str, ChatModelBase]
) -> JudgeOutput:
assert "judge_model" in auxiliary_models
judge_model = auxiliary_models["judge_model"]
assert isinstance(judge_model, ChatModelBase)
agent = ReActAgent(
name="judge_agent",
model=judge_model,
sys_prompt="You are a judge to evaluate the correctness of answers.",
formatter=OpenAIChatFormatter(),
)
workflow_text_response = response.get_text_content()
st = time.time()
judge_response = await agent.reply(
Msg(
"user",
f"Question: {task['question']}\nAnswer: {workflow_text_response}\nIs the answer correct? Reply with 'Yes' or 'No'.",
role="user",
)
)
et = time.time()
judge_response = judge_response.get_text_content()
if judge_response is not None and "yes" in judge_response.lower():
is_correct = True
else:
is_correct = False
return JudgeOutput(
reward=float(is_correct),
metrics={"judge_time": et - st},
)

gsm8k_dataset = get_unittest_dataset_config("gsm8k")

dataset = Dataset(
path=gsm8k_dataset.path,
split="train",
total_steps=2,
)
eval_dataset = Dataset(
path=gsm8k_dataset.path,
split="test",
)

model = TunerChatModel(
model_path=get_model_path(),
max_model_len=4096,
max_tokens=2048,
inference_engine_num=2,
)

auxiliary_models = {
"judge_model": TunerChatModel(
model_path=get_model_path(),
max_model_len=8192,
max_tokens=2048,
inference_engine_num=2,
)
}

algorithm = Algorithm(
algorithm_type="multi_step_grpo",
batch_size=4,
group_size=4,
eval_interval_steps=2,
save_interval_steps=2,
)

tune(
workflow_func=workflow_func,
judge_func=judge_func,
train_dataset=dataset,
eval_dataset=eval_dataset,
model=model,
auxiliary_models=auxiliary_models,
algorithm=algorithm,
)
# check checkpoint dir in `./checkpoints/AgentScope/Experiment-<timestamp>`
self.assertTrue(os.path.exists("./checkpoints/AgentScope"))
exp_dirs = os.listdir("./checkpoints/AgentScope")
self.assertGreaterEqual(len(exp_dirs), 1)
latest_exp_dir = sorted(exp_dirs)[-1]
exp_dir_path = os.path.join("./checkpoints/AgentScope", latest_exp_dir)
_, step_num = get_checkpoint_dir_with_step_num(
checkpoint_root_path=exp_dir_path,
trainer_type="verl",
)
self.assertEqual(step_num, 2)
# check tensorboard
parser = TensorBoardParser(os.path.join(exp_dir_path, "monitor", "tensorboard"))
rollout_metrics = parser.metric_list("rollout")
self.assertIn("rollout/workflow_time/mean", rollout_metrics)
self.assertIn("rollout/judge_time/mean", rollout_metrics)
self.assertEqual(parser.metric_max_step(rollout_metrics[0]), 2)
eval_metrics = parser.metric_list("eval")
self.assertGreater(len(eval_metrics), 0)
self.assertEqual(parser.metric_max_step(eval_metrics[0]), 2)
actor_metrics = parser.metric_list("actor")
self.assertGreater(len(actor_metrics), 0)
self.assertEqual(parser.metric_max_step(actor_metrics[0]), 2)
7 changes: 7 additions & 0 deletions trinity/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ class TasksetConfig:
total_epochs: int = 1 # automatically set
# ! DO NOT SET, automatically set from buffer.total_steps
total_steps: Optional[int] = None # automatically set
# ! DO NOT SET, automatically set form ray_namespace
ray_namespace: Optional[str] = None

def to_storage_config(self) -> StorageConfig:
storage_config = StorageConfig(
Expand All @@ -285,6 +287,7 @@ def to_storage_config(self) -> StorageConfig:
batch_size=self.batch_size,
total_epochs=self.total_epochs,
total_steps=self.total_steps,
ray_namespace=self.ray_namespace,
)
return storage_config

Expand Down Expand Up @@ -324,6 +327,8 @@ class ExperienceBufferConfig:
total_epochs: int = 1 # automatically set
# ! DO NOT SET, automatically set from buffer.total_steps
total_steps: Optional[int] = None # automatically set
# ! DO NOT SET, automatically set form ray_namespace
ray_namespace: Optional[str] = None

def to_storage_config(self) -> StorageConfig:
storage_config = StorageConfig(
Expand All @@ -345,6 +350,7 @@ def to_storage_config(self) -> StorageConfig:
tokenizer_path=self.tokenizer_path,
total_epochs=self.total_epochs,
total_steps=self.total_steps,
ray_namespace=self.ray_namespace,
)
return storage_config

Expand Down Expand Up @@ -546,6 +552,7 @@ class InferenceModelConfig:

# ! DO NOT SET
bundle_indices: str = ""
ray_namespace: Optional[str] = None

# ! DO NOT SET, automatically set from model.lora_configs
enable_lora: bool = False
Expand Down
6 changes: 4 additions & 2 deletions trinity/common/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def create_inference_models(
from trinity.common.models.tinker_model import TinkerModel

engine_cls = TinkerModel
namespace = ray.get_runtime_context().namespace
namespace = config.ray_namespace
rollout_engines = [
ray.remote(engine_cls)
.options(
Expand Down Expand Up @@ -111,7 +111,8 @@ def create_inference_models(
for bundle_id, node_id in bundle_node_map.items():
node_bundle_map[node_id].append(bundle_id)
allocator = _BundleAllocator(node_bundle_map)
namespace = ray.get_runtime_context().namespace
namespace = config.ray_namespace
config.explorer.rollout_model.ray_namespace = namespace
# create rollout models
# in 'serve' mode, we always enable openai api for rollout model
if config.mode == "serve":
Expand Down Expand Up @@ -147,6 +148,7 @@ def create_inference_models(
# create auxiliary models
for i, model_config in enumerate(config.explorer.auxiliary_models):
engines = []
model_config.ray_namespace = namespace
for j in range(model_config.engine_num):
bundles_for_engine = allocator.allocate(model_config.tensor_parallel_size)
model_config.enable_openai_api = True
Expand Down
Loading