diff --git a/apps/vllm/judge.py b/apps/vllm/judge.py index d1cd886b..015a3226 100644 --- a/apps/vllm/judge.py +++ b/apps/vllm/judge.py @@ -39,7 +39,10 @@ async def run(cfg: DictConfig): print(f"Responses: {responses}\n") print("Evaluating responses...") best_response_evaluations: list[str] = await judge.evaluate.route( - prompt=prompt, responses=responses, evaluation_mode=EvaluationMode.BEST_RESPONSE + prompt=prompt, + responses=responses, + ground_truth=ground_truth, + evaluation_mode=EvaluationMode.BEST_RESPONSE, ) response_check_evaluations: list[str] = await judge.evaluate.route( prompt=prompt, diff --git a/apps/vllm/llama3_8b.yaml b/apps/vllm/llama3_8b.yaml index c4bc141b..64d6d344 100644 --- a/apps/vllm/llama3_8b.yaml +++ b/apps/vllm/llama3_8b.yaml @@ -6,6 +6,7 @@ policy: tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: true + # task: "score" sampling_config: n: 2 guided_decoding: false diff --git a/apps/vllm/rewardjudge.py b/apps/vllm/rewardjudge.py new file mode 100644 index 00000000..42ed0e15 --- /dev/null +++ b/apps/vllm/rewardjudge.py @@ -0,0 +1,51 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +"""To run: +export HF_HUB_DISABLE_XET=1 python -m apps.vllm.rewardjudge +""" + +from forge.actors.judge import RewardModelJudge + + +def run(): + # metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) + # mlogger = await get_or_create_metric_logger() + # await mlogger.init_backends.call_one(metric_logging_cfg) + + prompt = "Jane has 12 apples. She gives 4 apples to her friend Mark, \ + then buys 1 more apple, and finally splits all her apples equally among \ + herself and her 2 siblings. How many apples does each person get?" + response1 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. \ + Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 \ + apples. 3. Jane splits the 9 apples equally among herself and her 2 siblings \ + (3 people in total). 9 ÷ 3 = 3 apples each. Each person gets 3 apples." + response2 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. \ + Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 \ + apples. 3. Jane splits the 9 apples equally among her 2 siblings (2 people in \ + total). 9 ÷ 2 = 4.5 apples each. Each person gets 4 apples." + + responses = [response1, response2] + + print(f"Prompt: {prompt}") + print(f"Responses: {responses}\n") + print("Evaluating responses...") + + model = "Skywork/Skywork-Reward-V2-Llama-3.1-8B" + judge = RewardModelJudge(model) + scores = judge.evaluate(prompt=prompt, responses=responses) + + print("\nGeneration Results:") + print("=" * 80) + for batch, (response, score) in enumerate(zip(responses, scores)): + print(f"Sample {batch + 1}") + print(f"Response: {response}") + print(f"Score: {score}") + print("-" * 80) + + +if __name__ == "__main__": + run() diff --git a/src/forge/actors/judge.py b/src/forge/actors/judge.py index 0056dfed..a74304c7 100644 --- a/src/forge/actors/judge.py +++ b/src/forge/actors/judge.py @@ -185,30 +185,26 @@ async def evaluate( return self._postprocess_output(response) +from vllm import LLM +from vllm.outputs import ScoringRequestOutput + + @dataclass -class RewardModelJudge(Policy): +class RewardModelJudge: """ `RewardModels` are typically discriminative models, post trained to evaluate responses without further prompting required. """ - # TODO: Add reward models formatting - def wrapped_prompt( - self, prompt: str, responses: list[str], ground_truth: None | str = None - ) -> str: - return prompt + def __init__(self, model: str): + # def __init__(self, cfgmodel: str): + self.model_name = model + self.model = LLM(model=model, task="score") + # policy = await Policy.options(**cfg.services.policy).as_service(**cfg.policy) - def _postprocess_output( - self, outputs: list[Completion], ground_truth: None | str = None - ) -> list[str]: - return [output.text for output in outputs] + def _postprocess_output(self, outputs: list[ScoringRequestOutput]) -> list[float]: + return [output.outputs.score for output in outputs] - @endpoint - async def evaluate( - self, - prompt: str, - responses: list[str], - ) -> list[str]: - wrapped_prompt: str = self._wrap_prompt(prompt, responses) - response: List[Completion] = await self.generate._method(self, wrapped_prompt) - return self._postprocess_output(response) + def evaluate(self, prompt: str, responses: list[str]) -> list[str]: + outputs: list[ScoringRequestOutput] = self.model.score(prompt, responses) + return self._postprocess_output(outputs) diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 464674f2..05c1e15e 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -329,6 +329,7 @@ async def generate(self, prompt: str, priority: int = 0) -> list[Completion]: t.step("prompt_truncation") # process and tokenize prompt + # (forge/issues/332) Signature of this changes prompt_str, request = self.processor.process_inputs( request_id=request_id, prompt=prompt_dict, @@ -340,6 +341,7 @@ async def generate(self, prompt: str, priority: int = 0) -> list[Completion]: priority=priority, data_parallel_rank=None, ) + print("Pooling: ", request.pooling_params) t.step("process_inputs") # Wait until we're accepting requests (releases lock while waiting) @@ -590,6 +592,10 @@ async def setup(self): @endpoint async def execute_model(self, schedule: SchedulerOutput): + # print("execute_model: ", self.worker.get_supported_pooling_tasks()) + # print("execute_model: ", schedule.scheduled_new_reqs) + # print("execute_model: ", schedule.scheduled_new_reqs[0].pooling_params) + # print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") return self.worker.execute_model(schedule) async def _load_tensor_parallel_state_dict(