From a99d8bcdfe3fb14fcd4f0033fda70d928caf262b Mon Sep 17 00:00:00 2001 From: Jack-Khuu Date: Mon, 6 Oct 2025 13:24:58 -0700 Subject: [PATCH 1/3] push --- apps/vllm/judge.py | 5 ++- apps/vllm/rewardjudge.py | 68 +++++++++++++++++++++++++++++++++++++++ src/forge/actors/judge.py | 60 +++++++++++++++++++++++++++------- 3 files changed, 120 insertions(+), 13 deletions(-) create mode 100644 apps/vllm/rewardjudge.py diff --git a/apps/vllm/judge.py b/apps/vllm/judge.py index d1cd886b1..015a32260 100644 --- a/apps/vllm/judge.py +++ b/apps/vllm/judge.py @@ -39,7 +39,10 @@ async def run(cfg: DictConfig): print(f"Responses: {responses}\n") print("Evaluating responses...") best_response_evaluations: list[str] = await judge.evaluate.route( - prompt=prompt, responses=responses, evaluation_mode=EvaluationMode.BEST_RESPONSE + prompt=prompt, + responses=responses, + ground_truth=ground_truth, + evaluation_mode=EvaluationMode.BEST_RESPONSE, ) response_check_evaluations: list[str] = await judge.evaluate.route( prompt=prompt, diff --git a/apps/vllm/rewardjudge.py b/apps/vllm/rewardjudge.py new file mode 100644 index 000000000..6e5488c81 --- /dev/null +++ b/apps/vllm/rewardjudge.py @@ -0,0 +1,68 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +"""To run: +export HF_HUB_DISABLE_XET=1 +python -m apps.vllm.judge --config apps/vllm/llama3_8b.yaml +""" + +# flake8: noqa + +import asyncio + +from forge.actors.judge import RewardModelJudge + + +async def run(): + + prompt = "Jane has 12 apples. She gives 4 apples to her friend Mark, then buys 1 more apple, and finally splits all her apples equally among herself and her 2 siblings. How many apples does each person get?" + response1 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 apples. 3. Jane splits the 9 apples equally among herself and her 2 siblings (3 people in total). 9 ÷ 3 = 3 apples each. Each person gets 3 apples." + response2 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 apples. 3. Jane splits the 9 apples equally among her 2 siblings (2 people in total). 9 ÷ 2 = 4.5 apples each. Each person gets 4 apples." + + responses = [response1, response2] + + conv1 = [ + {"role": "user", "content": prompt}, + {"role": "assistant", "content": response1}, + ] + conv2 = [ + {"role": "user", "content": prompt}, + {"role": "assistant", "content": response2}, + ] + + print(f"Prompt: {prompt}") + print(f"Responses: {responses}\n") + print("Evaluating responses...") + + judge = RewardModelJudge("Skywork/Skywork-Reward-V2-Llama-3.1-8B", num_labels=1) + judge.evaluate(responses=[conv1, conv2]) + + # best_response_evaluations: list[str] = await judge.evaluate.route( + # prompt=prompt, + # responses=responses, + # ground_truth=ground_truth, + # evaluation_mode=EvaluationMode.BEST_RESPONSE, + # ) + + # print("\nGeneration Results:") + # print("=" * 80) + # for batch, (best, fact) in enumerate( + # zip(best_response_evaluations, response_check_evaluations) + # ): + # print(f"Sample {batch + 1}") + # print(f"Evaluation (BEST_RESPONSE): {best}") + # print(f"Evaluation (RESPONSE_CHECK): {fact}") + # print("-" * 80) + + # print("\nShutting down...") + + +def recipe_main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + recipe_main() diff --git a/src/forge/actors/judge.py b/src/forge/actors/judge.py index 0056dfed3..da1269adf 100644 --- a/src/forge/actors/judge.py +++ b/src/forge/actors/judge.py @@ -7,6 +7,8 @@ from dataclasses import dataclass from enum import auto, Enum +import torch + from monarch.actor import endpoint from forge.actors.policy import Policy @@ -186,29 +188,63 @@ async def evaluate( @dataclass -class RewardModelJudge(Policy): +class RewardModelJudge: """ `RewardModels` are typically discriminative models, post trained to evaluate responses without further prompting required. """ - # TODO: Add reward models formatting - def wrapped_prompt( - self, prompt: str, responses: list[str], ground_truth: None | str = None + def __init__(self, model: str, num_labels: int): + from transformers import AutoModelForSequenceClassification, AutoTokenizer + + self.model_name = model + self.model = AutoModelForSequenceClassification.from_pretrained( + model, num_labels=num_labels + ) + self.tokenizer = AutoTokenizer.from_pretrained(model) + + def _wrap_prompt( + self, + prompt: None | str = None, + responses: None | list[str] = None, + ground_truth: None | str = None, ) -> str: - return prompt + conv1 = responses[0] + conv2 = responses[1] + + conv1_formatted = self.tokenizer.apply_chat_template(conv1, tokenize=False) + conv2_formatted = self.tokenizer.apply_chat_template(conv2, tokenize=False) + if self.tokenizer.bos_token is not None and conv1_formatted.startswith( + self.tokenizer.bos_token + ): + conv1_formatted = conv1_formatted[len(self.tokenizer.bos_token) :] + if self.tokenizer.bos_token is not None and conv2_formatted.startswith( + self.tokenizer.bos_token + ): + conv2_formatted = conv2_formatted[len(self.tokenizer.bos_token) :] + conv1_tokenized = self.tokenizer(conv1_formatted, return_tensors="pt") + conv2_tokenized = self.tokenizer(conv2_formatted, return_tensors="pt") + return conv1_tokenized, conv2_tokenized def _postprocess_output( self, outputs: list[Completion], ground_truth: None | str = None ) -> list[str]: return [output.text for output in outputs] - @endpoint - async def evaluate( + def evaluate( self, - prompt: str, - responses: list[str], + prompt: None | str = None, + responses: None | list[str] = None, + ground_truth: None | str = None, ) -> list[str]: - wrapped_prompt: str = self._wrap_prompt(prompt, responses) - response: List[Completion] = await self.generate._method(self, wrapped_prompt) - return self._postprocess_output(response) + conv1_tokenized, conv2_tokenized = self._wrap_prompt( + prompt, responses, ground_truth + ) + + with torch.no_grad(): + score1 = self.model(**conv1_tokenized).logits[0][0].item() + score2 = self.model(**conv2_tokenized).logits[0][0].item() + print(f"Score for response 1: {score1}") + print(f"Score for response 2: {score2}") + # response: List[Completion] = await self.generate._method(self, wrapped_prompt) + # return self._postprocess_output(response) From 3b581728cbe75b2444de6804ff3e25389b587426 Mon Sep 17 00:00:00 2001 From: Jack-Khuu Date: Mon, 6 Oct 2025 16:41:38 -0700 Subject: [PATCH 2/3] Switch to vllm --- apps/vllm/rewardjudge.py | 69 ++++++++++++++------------------------- src/forge/actors/judge.py | 64 +++++++----------------------------- 2 files changed, 35 insertions(+), 98 deletions(-) diff --git a/apps/vllm/rewardjudge.py b/apps/vllm/rewardjudge.py index 6e5488c81..207442c68 100644 --- a/apps/vllm/rewardjudge.py +++ b/apps/vllm/rewardjudge.py @@ -5,64 +5,43 @@ # LICENSE file in the root directory of this source tree. """To run: -export HF_HUB_DISABLE_XET=1 -python -m apps.vllm.judge --config apps/vllm/llama3_8b.yaml +export HF_HUB_DISABLE_XET=1 python -m apps.vllm.rewardjudge """ -# flake8: noqa - -import asyncio - from forge.actors.judge import RewardModelJudge -async def run(): - - prompt = "Jane has 12 apples. She gives 4 apples to her friend Mark, then buys 1 more apple, and finally splits all her apples equally among herself and her 2 siblings. How many apples does each person get?" - response1 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 apples. 3. Jane splits the 9 apples equally among herself and her 2 siblings (3 people in total). 9 ÷ 3 = 3 apples each. Each person gets 3 apples." - response2 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 apples. 3. Jane splits the 9 apples equally among her 2 siblings (2 people in total). 9 ÷ 2 = 4.5 apples each. Each person gets 4 apples." +def run(): + prompt = "Jane has 12 apples. She gives 4 apples to her friend Mark, \ + then buys 1 more apple, and finally splits all her apples equally among \ + herself and her 2 siblings. How many apples does each person get?" + response1 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. \ + Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 \ + apples. 3. Jane splits the 9 apples equally among herself and her 2 siblings \ + (3 people in total). 9 ÷ 3 = 3 apples each. Each person gets 3 apples." + response2 = "1. Jane starts with 12 apples and gives 4 to Mark. 12 - 4 = 8. \ + Jane now has 8 apples. 2. Jane buys 1 more apple. 8 + 1 = 9. Jane now has 9 \ + apples. 3. Jane splits the 9 apples equally among her 2 siblings (2 people in \ + total). 9 ÷ 2 = 4.5 apples each. Each person gets 4 apples." responses = [response1, response2] - conv1 = [ - {"role": "user", "content": prompt}, - {"role": "assistant", "content": response1}, - ] - conv2 = [ - {"role": "user", "content": prompt}, - {"role": "assistant", "content": response2}, - ] - print(f"Prompt: {prompt}") print(f"Responses: {responses}\n") print("Evaluating responses...") - judge = RewardModelJudge("Skywork/Skywork-Reward-V2-Llama-3.1-8B", num_labels=1) - judge.evaluate(responses=[conv1, conv2]) - - # best_response_evaluations: list[str] = await judge.evaluate.route( - # prompt=prompt, - # responses=responses, - # ground_truth=ground_truth, - # evaluation_mode=EvaluationMode.BEST_RESPONSE, - # ) - - # print("\nGeneration Results:") - # print("=" * 80) - # for batch, (best, fact) in enumerate( - # zip(best_response_evaluations, response_check_evaluations) - # ): - # print(f"Sample {batch + 1}") - # print(f"Evaluation (BEST_RESPONSE): {best}") - # print(f"Evaluation (RESPONSE_CHECK): {fact}") - # print("-" * 80) - - # print("\nShutting down...") - + model = "Skywork/Skywork-Reward-V2-Llama-3.1-8B" + judge = RewardModelJudge(model) + scores = judge.evaluate(prompt=prompt, responses=responses) -def recipe_main() -> None: - asyncio.run(run()) + print("\nGeneration Results:") + print("=" * 80) + for batch, (response, score) in enumerate(zip(responses, scores)): + print(f"Sample {batch + 1}") + print(f"Response: {response}") + print(f"Score: {score}") + print("-" * 80) if __name__ == "__main__": - recipe_main() + run() diff --git a/src/forge/actors/judge.py b/src/forge/actors/judge.py index da1269adf..887f23754 100644 --- a/src/forge/actors/judge.py +++ b/src/forge/actors/judge.py @@ -7,8 +7,6 @@ from dataclasses import dataclass from enum import auto, Enum -import torch - from monarch.actor import endpoint from forge.actors.policy import Policy @@ -187,6 +185,10 @@ async def evaluate( return self._postprocess_output(response) +from vllm import LLM +from vllm.outputs import ScoringRequestOutput + + @dataclass class RewardModelJudge: """ @@ -194,57 +196,13 @@ class RewardModelJudge: evaluate responses without further prompting required. """ - def __init__(self, model: str, num_labels: int): - from transformers import AutoModelForSequenceClassification, AutoTokenizer - + def __init__(self, model: str): self.model_name = model - self.model = AutoModelForSequenceClassification.from_pretrained( - model, num_labels=num_labels - ) - self.tokenizer = AutoTokenizer.from_pretrained(model) - - def _wrap_prompt( - self, - prompt: None | str = None, - responses: None | list[str] = None, - ground_truth: None | str = None, - ) -> str: - conv1 = responses[0] - conv2 = responses[1] - - conv1_formatted = self.tokenizer.apply_chat_template(conv1, tokenize=False) - conv2_formatted = self.tokenizer.apply_chat_template(conv2, tokenize=False) - if self.tokenizer.bos_token is not None and conv1_formatted.startswith( - self.tokenizer.bos_token - ): - conv1_formatted = conv1_formatted[len(self.tokenizer.bos_token) :] - if self.tokenizer.bos_token is not None and conv2_formatted.startswith( - self.tokenizer.bos_token - ): - conv2_formatted = conv2_formatted[len(self.tokenizer.bos_token) :] - conv1_tokenized = self.tokenizer(conv1_formatted, return_tensors="pt") - conv2_tokenized = self.tokenizer(conv2_formatted, return_tensors="pt") - return conv1_tokenized, conv2_tokenized - - def _postprocess_output( - self, outputs: list[Completion], ground_truth: None | str = None - ) -> list[str]: - return [output.text for output in outputs] + self.model = LLM(model=model, task="score") - def evaluate( - self, - prompt: None | str = None, - responses: None | list[str] = None, - ground_truth: None | str = None, - ) -> list[str]: - conv1_tokenized, conv2_tokenized = self._wrap_prompt( - prompt, responses, ground_truth - ) + def _postprocess_output(self, outputs: list[ScoringRequestOutput]) -> list[float]: + return [output.outputs.score for output in outputs] - with torch.no_grad(): - score1 = self.model(**conv1_tokenized).logits[0][0].item() - score2 = self.model(**conv2_tokenized).logits[0][0].item() - print(f"Score for response 1: {score1}") - print(f"Score for response 2: {score2}") - # response: List[Completion] = await self.generate._method(self, wrapped_prompt) - # return self._postprocess_output(response) + def evaluate(self, prompt: str, responses: list[str]) -> list[str]: + outputs: list[ScoringRequestOutput] = self.model.score(prompt, responses) + return self._postprocess_output(outputs) From 60d87e9579f3b005cfd2baf463c189ae32e4a986 Mon Sep 17 00:00:00 2001 From: Jack-Khuu Date: Tue, 7 Oct 2025 11:06:28 -0700 Subject: [PATCH 3/3] Push to move device --- apps/vllm/llama3_8b.yaml | 1 + apps/vllm/rewardjudge.py | 4 ++++ src/forge/actors/judge.py | 2 ++ src/forge/actors/policy.py | 6 ++++++ 4 files changed, 13 insertions(+) diff --git a/apps/vllm/llama3_8b.yaml b/apps/vllm/llama3_8b.yaml index c4bc141bf..64d6d3444 100644 --- a/apps/vllm/llama3_8b.yaml +++ b/apps/vllm/llama3_8b.yaml @@ -6,6 +6,7 @@ policy: tensor_parallel_size: 2 pipeline_parallel_size: 1 enforce_eager: true + # task: "score" sampling_config: n: 2 guided_decoding: false diff --git a/apps/vllm/rewardjudge.py b/apps/vllm/rewardjudge.py index 207442c68..42ed0e15c 100644 --- a/apps/vllm/rewardjudge.py +++ b/apps/vllm/rewardjudge.py @@ -12,6 +12,10 @@ def run(): + # metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) + # mlogger = await get_or_create_metric_logger() + # await mlogger.init_backends.call_one(metric_logging_cfg) + prompt = "Jane has 12 apples. She gives 4 apples to her friend Mark, \ then buys 1 more apple, and finally splits all her apples equally among \ herself and her 2 siblings. How many apples does each person get?" diff --git a/src/forge/actors/judge.py b/src/forge/actors/judge.py index 887f23754..a74304c7d 100644 --- a/src/forge/actors/judge.py +++ b/src/forge/actors/judge.py @@ -197,8 +197,10 @@ class RewardModelJudge: """ def __init__(self, model: str): + # def __init__(self, cfgmodel: str): self.model_name = model self.model = LLM(model=model, task="score") + # policy = await Policy.options(**cfg.services.policy).as_service(**cfg.policy) def _postprocess_output(self, outputs: list[ScoringRequestOutput]) -> list[float]: return [output.outputs.score for output in outputs] diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 464674f2c..05c1e15ee 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -329,6 +329,7 @@ async def generate(self, prompt: str, priority: int = 0) -> list[Completion]: t.step("prompt_truncation") # process and tokenize prompt + # (forge/issues/332) Signature of this changes prompt_str, request = self.processor.process_inputs( request_id=request_id, prompt=prompt_dict, @@ -340,6 +341,7 @@ async def generate(self, prompt: str, priority: int = 0) -> list[Completion]: priority=priority, data_parallel_rank=None, ) + print("Pooling: ", request.pooling_params) t.step("process_inputs") # Wait until we're accepting requests (releases lock while waiting) @@ -590,6 +592,10 @@ async def setup(self): @endpoint async def execute_model(self, schedule: SchedulerOutput): + # print("execute_model: ", self.worker.get_supported_pooling_tasks()) + # print("execute_model: ", schedule.scheduled_new_reqs) + # print("execute_model: ", schedule.scheduled_new_reqs[0].pooling_params) + # print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") return self.worker.execute_model(schedule) async def _load_tensor_parallel_state_dict(