Async model support OpenAI compatible API #44
Annotations
5 errors
|
unittest
Process completed with exit code 1.
|
|
Failed Test: tests/explorer/runner_pool_test.py::RunnerPoolTest::test_runner_pool
tests/explorer/runner_pool_test.py::RunnerPoolTest::test_runner_pool: The test failed in the call phase due to an assertion error - self = <tests.explorer.runner_pool_test.RunnerPoolTest testMethod=test_runner_pool>
def test_runner_pool(self):
pool = RunnerPool(self.config, [DummyModel.remote(), DummyModel.remote()])
taskset_config = get_unittest_dataset_config("countdown")
tasks = [
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "timeout_100",
},
),
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "exception",
},
),
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "timeout_2",
},
),
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "success",
},
),
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "timeout_101",
},
),
Task(
workflow=DummyWorkflow,
format_args=taskset_config.format,
rollout_args=taskset_config.rollout_args,
is_eval=False,
raw_task={
taskset_config.format.prompt_key: "exit",
},
),
]
pool.run_tasks(
tasks=tasks,
)
# The excepted return order is: `exception` -> `timeout_2` -> `success` -> (`timeout_100`and `timeout_101`) -> `exit`
# 1. `exception`
st = time.time()
status = pool.get_next_unorder()
et = time.time()
self.assertTrue(et - st < 2)
print(f"First task use time: {et - st}")
self.assertEqual(len(status), 1)
self.assertFalse(status[0].ok)
# 2. `timeout_2
st = time.time()
status = pool.get_next_unorder()
et = time.time()
> self.assertTrue(et - st > 2)
E AssertionError: False is not true
tests/explorer/runner_pool_test.py:160: AssertionError
|
|
Failed Test: tests/common/vllm_test.py::TestModelWrapperAsyncTPV1::test_generate
tests/common/vllm_test.py::TestModelWrapperAsyncTPV1::test_generate: The test failed in the call phase - self = <tests.common.vllm_test.TestModelWrapperAsyncTPV1 testMethod=test_generate>
def setUp(self):
self.config = get_template_config()
self.config.model.model_path = get_model_path()
self.config.explorer.engine_type = "vllm_async"
self.config.explorer.engine_num = 2
self.config.explorer.tensor_parallel_size = 2
self.config.buffer.explorer_input.taskset.rollout_args.repeat_times = 2
self.config.explorer.use_v1 = True
self.config.explorer.chat_template = CHAT_TEMPLATE
> self.engines, self.auxiliary_engines = create_inference_models(self.config)
tests/common/vllm_test.py:184:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
config = Config(mode='both', data_processor=DataProcessorConfig(data_workflow_url=None, source_data_path='', format=FormatConfi...eout=1200, wait_for_checkpoint=False, master_address=None, master_port=None, explorer_world_size=None, backend='nccl'))
def create_inference_models(
config: Config,
) -> Tuple[List[InferenceModel], List[InferenceModel]]:
"""Create `engine_num` rollout models.
Each model has `tensor_parallel_size` workers.
"""
import ray
from ray.util.placement_group import placement_group, placement_group_table
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from trinity.common.models.vllm_async_model import vLLMAysncRolloutModel
from trinity.common.models.vllm_model import vLLMRolloutModel
engine_num = config.explorer.engine_num
tensor_parallel_size = config.explorer.tensor_parallel_size
is_multi_process = config.explorer.tensor_parallel_size > 1
if config.explorer.enable_openai_api and config.explorer.engine_type != "vllm_async":
raise ValueError("OpenAI API is only supported for vllm_async engine")
rollout_engines = []
if config.explorer.engine_type == "vllm":
engine_cls = vLLMRolloutModel
elif config.explorer.engine_type == "vllm_async":
engine_cls = vLLMAysncRolloutModel
else:
raise ValueError(f"Unknown engine type: {config.explorer.engine_type}")
main_bundles = [{"GPU": 1, "CPU": 1} for _ in range(engine_num * tensor_parallel_size)]
auxiliary_bundles = [
{"GPU": 1, "CPU": 1}
for _ in range(
sum([model.tensor_parallel_size for model in config.explorer.auxiliary_models])
)
]
pg = placement_group(main_bundles + auxiliary_bundles, strategy="PACK")
ray.get(pg.ready())
rollout_engines = []
auxiliary_engines = []
# to address https://github.com/ray-project/ray/issues/51117
# aggregate bundles belonging to the same node
bundle_node_map = placement_group_table(pg)["bundles_to_node_id"]
node_bundle_map = defaultdict(list)
for bundle_id, node_id in bundle_node_map.items():
node_bundle_map[node_id].append(bundle_id)
allocator = _BundleAllocator(node_bundle_map)
# create rollout models
for _ in range(config.explorer.engine_num):
bundles_for_engine = allocator.allocate(config.explorer.tensor_parallel_size)
model_config = InferenceModelConfig(
model_path=config.model.model_path,
tensor_parallel_size=config.explorer.tensor_parallel_size,
use_v1=config.explorer.use_v1,
max_prompt_tokens=config.model.max_prompt_tokens,
max_response_tokens=config.model.max_response_tokens,
enforce_eager=config.explorer.enforce_eager,
enable_prefix_caching=config.explorer.enable_prefix_caching,
enable_chunked_prefill=config.explorer.enable_chunked_prefill,
enable_thinking=config.model.enable_thinking,
|
|
Failed Test: tests/common/vllm_test.py::TestModelWrapperAsyncTPV0::test_generate
tests/common/vllm_test.py::TestModelWrapperAsyncTPV0::test_generate: The test failed in the call phase - self = <tests.common.vllm_test.TestModelWrapperAsyncTPV0 testMethod=test_generate>
def setUp(self):
self.config = get_template_config()
self.config.model.model_path = get_model_path()
self.config.explorer.engine_type = "vllm_async"
self.config.explorer.engine_num = 2
self.config.explorer.tensor_parallel_size = 2
self.config.explorer.use_v1 = False
self.config.explorer.chat_template = CHAT_TEMPLATE
> self.engines, self.auxiliary_engines = create_inference_models(self.config)
tests/common/vllm_test.py:170:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
config = Config(mode='both', data_processor=DataProcessorConfig(data_workflow_url=None, source_data_path='', format=FormatConfi...eout=1200, wait_for_checkpoint=False, master_address=None, master_port=None, explorer_world_size=None, backend='nccl'))
def create_inference_models(
config: Config,
) -> Tuple[List[InferenceModel], List[InferenceModel]]:
"""Create `engine_num` rollout models.
Each model has `tensor_parallel_size` workers.
"""
import ray
from ray.util.placement_group import placement_group, placement_group_table
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from trinity.common.models.vllm_async_model import vLLMAysncRolloutModel
from trinity.common.models.vllm_model import vLLMRolloutModel
engine_num = config.explorer.engine_num
tensor_parallel_size = config.explorer.tensor_parallel_size
is_multi_process = config.explorer.tensor_parallel_size > 1
if config.explorer.enable_openai_api and config.explorer.engine_type != "vllm_async":
raise ValueError("OpenAI API is only supported for vllm_async engine")
rollout_engines = []
if config.explorer.engine_type == "vllm":
engine_cls = vLLMRolloutModel
elif config.explorer.engine_type == "vllm_async":
engine_cls = vLLMAysncRolloutModel
else:
raise ValueError(f"Unknown engine type: {config.explorer.engine_type}")
main_bundles = [{"GPU": 1, "CPU": 1} for _ in range(engine_num * tensor_parallel_size)]
auxiliary_bundles = [
{"GPU": 1, "CPU": 1}
for _ in range(
sum([model.tensor_parallel_size for model in config.explorer.auxiliary_models])
)
]
pg = placement_group(main_bundles + auxiliary_bundles, strategy="PACK")
ray.get(pg.ready())
rollout_engines = []
auxiliary_engines = []
# to address https://github.com/ray-project/ray/issues/51117
# aggregate bundles belonging to the same node
bundle_node_map = placement_group_table(pg)["bundles_to_node_id"]
node_bundle_map = defaultdict(list)
for bundle_id, node_id in bundle_node_map.items():
node_bundle_map[node_id].append(bundle_id)
allocator = _BundleAllocator(node_bundle_map)
# create rollout models
for _ in range(config.explorer.engine_num):
bundles_for_engine = allocator.allocate(config.explorer.tensor_parallel_size)
model_config = InferenceModelConfig(
model_path=config.model.model_path,
tensor_parallel_size=config.explorer.tensor_parallel_size,
use_v1=config.explorer.use_v1,
max_prompt_tokens=config.model.max_prompt_tokens,
max_response_tokens=config.model.max_response_tokens,
enforce_eager=config.explorer.enforce_eager,
enable_prefix_caching=config.explorer.enable_prefix_caching,
enable_chunked_prefill=config.explorer.enable_chunked_prefill,
enable_thinking=config.model.enable_thinking,
gpu_memory_utilization=config.explorer.gpu_memory_utilization,
|
|
unittest
Process completed with exit code 1.
|