diff --git a/cookbooks/training_judge_model/grpo/chat_rl_dataset.py b/cookbooks/training_judge_model/grpo/chat_rl_dataset.py
index f991887a..99595fbd 100644
--- a/cookbooks/training_judge_model/grpo/chat_rl_dataset.py
+++ b/cookbooks/training_judge_model/grpo/chat_rl_dataset.py
@@ -17,48 +17,47 @@
from typing import List, Union
import datasets
+import verl.utils.torch_functional as verl_F
from omegaconf import DictConfig, ListConfig
from torch.utils.data import Dataset
from transformers import PreTrainedTokenizer
-
-import verl.utils.torch_functional as verl_F
from verl.utils.model import compute_position_id_with_mask
-# 注意:已移除 pydantic 模板类以避免 Ray pickle 序列化问题
+# Note: Removed pydantic template classes to avoid Ray pickle serialization issues
class BaseChatRLDataset(Dataset):
- """聊天强化学习数据集基类"""
+ """Base class for chat reinforcement learning dataset."""
def __init__(
self,
data_files: Union[str, List[str]],
tokenizer: PreTrainedTokenizer,
config: DictConfig,
- processor=None, # 保持向后兼容性,但不使用
- max_samples: int = -1, # 添加 max_samples 参数
+ processor=None, # Keep for backward compatibility, but not used
+ max_samples: int = -1, # Add max_samples parameter
):
- # 初始化基本属性
+ # Initialize basic attributes
self.data_files = self._normalize_data_files(data_files)
self.original_data_files = copy.deepcopy(self.data_files)
self.tokenizer = tokenizer
self.config = config
self.max_samples = max_samples
-
- # 加载配置设置
+
+ # Load configuration settings
self._load_config()
-
- # 加载和处理数据
+
+ # Load and process data
self._load_dataset()
def _normalize_data_files(self, data_files):
- """将数据文件转换为列表格式"""
+ """Convert data files to list format."""
if not isinstance(data_files, (List, ListConfig)):
data_files = [data_files]
return copy.deepcopy(data_files)
def _load_config(self):
- """加载配置参数"""
+ """Load configuration parameters."""
self.cache_dir = os.path.expanduser(self.config.get("cache_dir", "~/.cache/verl/rlhf"))
self.prompt_key = self.config.get("prompt_key", "prompt")
self.max_prompt_length = self.config.get("max_prompt_length", 1024)
@@ -66,146 +65,141 @@ def _load_config(self):
self.truncation = self.config.get("truncation", "error")
self.filter_overlong_prompts = self.config.get("filter_overlong_prompts", True)
self.num_workers = min(
- self.config.get("filter_overlong_prompts_workers", max(1, os.cpu_count() // 4)),
- os.cpu_count()
+ self.config.get("filter_overlong_prompts_workers", max(1, os.cpu_count() // 4)), os.cpu_count()
)
self.serialize_dataset = False
def _download_files(self):
- """下载文件到本地缓存"""
+ """Download files to local cache."""
from verl.utils.fs import copy_to_local
-
+
for i, file in enumerate(self.data_files):
self.data_files[i] = copy_to_local(src=file, cache_dir=self.cache_dir)
def _load_dataset(self):
- """加载和处理数据集"""
+ """Load and process dataset."""
self._download_files()
-
- # 加载parquet文件
+
+ # Load parquet files
dataframes = []
for file in self.data_files:
df = datasets.load_dataset("parquet", data_files=file)["train"]
dataframes.append(df)
-
+
self.dataframe = datasets.concatenate_datasets(dataframes)
total = len(self.dataframe)
- print(f"数据集长度: {total}")
-
- # 处理 max_samples 参数
+ print(f"Dataset length: {total}")
+
+ # Handle max_samples parameter
if self.max_samples > 0 and self.max_samples < total:
import numpy as np
+
indices = np.arange(self.max_samples)
self.dataframe = self.dataframe.select(indices.tolist())
- print(f"选择了 {self.max_samples} 个样本(共 {total} 个)")
-
- # 过滤过长的提示
+ print(f"Selected {self.max_samples} samples (total: {total})")
+
+ # Filter overlong prompts
if self.filter_overlong_prompts:
self._filter_long_prompts()
def _filter_long_prompts(self):
- """过滤掉过长的提示"""
- # 提取 tokenizer 和参数到局部变量,避免 pickle 序列化问题
+ """Filter out overlong prompts."""
+ # Extract tokenizer and params to local variables to avoid pickle serialization issues
tokenizer = self.tokenizer
max_length = self.max_prompt_length
prompt_key = self.prompt_key
-
+
def is_prompt_valid(doc):
try:
- # 内联提取 prompt 逻辑,避免调用 self 方法
+ # Inline prompt extraction logic to avoid calling self methods
prompt = ""
if "input" in doc and doc["input"]:
for msg in doc["input"]:
if isinstance(msg, dict) and msg.get("role") == "user" and msg.get("content"):
prompt = msg["content"]
break
-
+
if not prompt:
- # 回退到其他字段
+ # Fallback to other fields
prompt = doc.get(prompt_key, "")
if isinstance(prompt, list) and prompt:
prompt = prompt[0].get("content", "") if isinstance(prompt[0], dict) else str(prompt[0])
-
+
if not prompt:
- return True # 如果无法提取 prompt,保留该样本
-
+ return True # Keep sample if prompt cannot be extracted
+
return len(tokenizer.encode(prompt)) <= max_length
except Exception as e:
- print(f"过滤时出错: {e}")
- return True # 出错时保留该样本
-
+ print(f"Error during filtering: {e}")
+ return True # Keep sample on error
+
original_len = len(self.dataframe)
self.dataframe = self.dataframe.filter(
is_prompt_valid,
- num_proc=1, # 使用单进程避免序列化问题
- desc=f"过滤长度超过 {max_length} tokens的提示",
+ num_proc=1, # Use single process to avoid serialization issues
+ desc=f"Filtering prompts exceeding {max_length} tokens",
)
- print(f"过滤后数据集长度: {len(self.dataframe)} (原始: {original_len})")
+ print(f"Dataset length after filtering: {len(self.dataframe)} (original: {original_len})")
def _extract_prompt(self, example):
- """从样本中提取提示"""
- # 首先尝试新的数据结构
+ """Extract prompt from example."""
+ # First try new data structure
if "input" in example and example["input"]:
for msg in example["input"]:
if msg.get("role") == "user" and msg.get("content"):
return msg["content"]
-
- # 回退到旧的数据结构
+
+ # Fallback to old data structure
prompt = example.get(self.prompt_key)
if prompt is None:
prompt = example.get("x", [])
if prompt:
return prompt[-1].get("content", "")
-
+
if isinstance(prompt, str):
- return prompt[:self.max_prompt_length]
+ return prompt[: self.max_prompt_length]
elif isinstance(prompt, list) and prompt:
return prompt[0].get("content", "") if isinstance(prompt[0], dict) else str(prompt[0])
-
+
return ""
def _build_messages(self, example: dict) -> List[dict]:
- """从样本构建聊天消息 - 子类需要重写"""
+ """Build chat messages from example - subclasses must override."""
raise NotImplementedError("Subclasses must implement _build_messages")
def _format_template(self, messages: List[dict], example: dict) -> str:
- """格式化模板 - 子类需要重写"""
+ """Format template - subclasses must override."""
raise NotImplementedError("Subclasses must implement _format_template")
def _extract_ground_truth(self, row_dict):
- """提取真实标签 - 子类需要重写"""
+ """Extract ground truth label - subclasses must override."""
raise NotImplementedError("Subclasses must implement _extract_ground_truth")
def __getitem__(self, item):
- """获取数据集中的一个项目"""
+ """Get an item from the dataset."""
row_dict = dict(self.dataframe[item])
messages = self._build_messages(row_dict)
-
- # 格式化提示
+
+ # Format prompt
raw_prompt_messages = self._format_template(messages, row_dict)
- # 尝试使用 enable_thinking 参数,如果不支持则回退
+ # Try using enable_thinking parameter, fallback if not supported
try:
raw_prompt = self.tokenizer.apply_chat_template(
- raw_prompt_messages,
- add_generation_prompt=True,
- tokenize=False,
- enable_thinking=True
+ raw_prompt_messages, add_generation_prompt=True, tokenize=False, enable_thinking=True
)
except TypeError:
- # 如果 tokenizer 不支持 enable_thinking 参数,则不使用
+ # If tokenizer doesn't support enable_thinking parameter, skip it
raw_prompt = self.tokenizer.apply_chat_template(
- raw_prompt_messages,
- add_generation_prompt=True,
- tokenize=False
+ raw_prompt_messages, add_generation_prompt=True, tokenize=False
)
-
- # 分词
+
+ # Tokenize
model_inputs = self.tokenizer(raw_prompt, return_tensors="pt", add_special_tokens=False)
input_ids = model_inputs["input_ids"]
attention_mask = model_inputs["attention_mask"]
-
- # 后处理
+
+ # Post-process
input_ids, attention_mask = verl_F.postprocess_data(
input_ids=input_ids,
attention_mask=attention_mask,
@@ -214,21 +208,21 @@ def __getitem__(self, item):
left_pad=True,
truncation=self.truncation,
)
-
- # 计算位置ID
+
+ # Compute position IDs
position_ids = compute_position_id_with_mask(attention_mask)
-
- # 准备原始提示ID
+
+ # Prepare raw prompt IDs
raw_prompt_ids = self.tokenizer.encode(raw_prompt, add_special_tokens=False)
if len(raw_prompt_ids) > self.max_prompt_length:
if self.truncation == "left":
- raw_prompt_ids = raw_prompt_ids[-self.max_prompt_length:]
+ raw_prompt_ids = raw_prompt_ids[-self.max_prompt_length :]
elif self.truncation == "right":
- raw_prompt_ids = raw_prompt_ids[:self.max_prompt_length]
+ raw_prompt_ids = raw_prompt_ids[: self.max_prompt_length]
elif self.truncation == "error":
- raise RuntimeError(f"提示长度 {len(raw_prompt_ids)} 超过 {self.max_prompt_length}")
-
- # 构建结果
+ raise RuntimeError(f"Prompt length {len(raw_prompt_ids)} exceeds {self.max_prompt_length}")
+
+ # Build result
result = {
"input_ids": input_ids[0],
"attention_mask": attention_mask[0],
@@ -239,26 +233,26 @@ def __getitem__(self, item):
"reward_model": {"ground_truth": self._extract_ground_truth(row_dict)},
"data_source": row_dict.get("source", "helpsteer2"),
}
-
+
if self.return_raw_chat:
result["raw_prompt"] = messages
-
+
return result
def __len__(self):
return len(self.dataframe)
def resume_dataset_state(self):
- """恢复数据集状态用于检查点"""
+ """Resume dataset state for checkpointing."""
self.serialize_dataset = not hasattr(self, "original_data_files")
if not self.serialize_dataset:
self.data_files = copy.deepcopy(self.original_data_files)
self._load_dataset()
else:
- print("使用旧的数据加载器检查点文件,建议从头开始训练")
+ print("Using old dataloader checkpoint file, recommend training from scratch")
def __getstate__(self):
- """获取用于序列化的状态"""
+ """Get state for serialization."""
if not self.serialize_dataset:
state = self.__dict__.copy()
if "dataframe" in state:
@@ -268,25 +262,25 @@ def __getstate__(self):
class PairwiseChatRLDataset(BaseChatRLDataset):
- """Pairwise聊天强化学习数据集"""
-
+ """Pairwise chat reinforcement learning dataset."""
+
def __init__(self, data_files, tokenizer, config, processor=None, max_samples: int = -1):
super().__init__(data_files, tokenizer, config, processor, max_samples)
- # Pairwise相关配置
- self.pairwise_response_index = self.config.get("pairwise_response_index", 0) # 选择哪个response进行训练
- print(f"使用 Pairwise 模式,选择 response index: {self.pairwise_response_index}")
+ # Pairwise related configuration
+ self.pairwise_response_index = self.config.get("pairwise_response_index", 0) # Which response to train on
+ print(f"Using Pairwise mode, selected response index: {self.pairwise_response_index}")
def _build_messages(self, example: dict) -> List[dict]:
- """从样本构建聊天消息 - Pairwise模式"""
+ """Build chat messages from example - Pairwise mode."""
messages = []
-
- # 从input字段提取用户消息
+
+ # Extract user message from input field
if "input" in example and example["input"]:
for msg in example["input"]:
if msg.get("role") == "user" and msg.get("content"):
messages.append({"role": "user", "content": msg["content"]})
-
- # Pairwise模式:选择指定的response
+
+ # Pairwise mode: select the specified response
if "output" in example and example["output"]:
if self.pairwise_response_index < len(example["output"]):
output_item = example["output"][self.pairwise_response_index]
@@ -295,17 +289,17 @@ def _build_messages(self, example: dict) -> List[dict]:
content = answer.get("content", "")
if content:
messages.append({"role": "assistant", "content": content})
-
- # 回退到原始结构
+
+ # Fallback to original structure
if len(messages) <= 1:
prompt = self._extract_prompt(example)
if prompt:
messages = [{"role": "user", "content": prompt}]
-
+
return messages
def _format_template(self, messages: List[dict], example: dict) -> str:
- """格式化pairwise模板"""
+ """Format pairwise template."""
task_desc = """You are a professional expert in response comparison.
You will be provided with a query and two different responses (A and B) to that query.
Your task is to determine which response is better by comparing their quality across multiple dimensions.
@@ -316,23 +310,23 @@ def _format_template(self, messages: List[dict], example: dict) -> str:
"Accuracy: Factual correctness and reliability of information",
"Safety: Avoiding harmful or inappropriate content",
]
-
- # 提取问题
- query = next((msg['content'] for msg in messages if msg['role'] == 'user'), '')
-
- # 获取两个回答
+
+ # Extract query
+ query = next((msg["content"] for msg in messages if msg["role"] == "user"), "")
+
+ # Get two responses
response_a = ""
response_b = ""
-
+
if "output" in example and len(example["output"]) >= 2:
response_a = example["output"][0].get("answer", {}).get("content", "")
response_b = example["output"][1].get("answer", {}).get("content", "")
-
- # 直接使用字符串格式化,避免使用 PairwiseTrainTemplate 类(防止 pickle 序列化问题)
+
+ # Use string formatting directly to avoid PairwiseTrainTemplate class (prevent pickle serialization issues)
principles_str = ""
for i, principle in enumerate(principles):
principles_str += f"{i + 1}. {principle}\n"
-
+
prompt = f"""# Task Description
{task_desc}
# Principles
@@ -351,50 +345,50 @@ def _format_template(self, messages: List[dict], example: dict) -> str:
return [{"role": "user", "content": prompt}]
def _extract_ground_truth(self, row_dict):
- """提取pairwise真实标签"""
+ """Extract pairwise ground truth label."""
try:
output_data = row_dict.get("output", [])
if output_data and len(output_data) >= 2:
- # 获取选中response的标签
+ # Get label from selected response
selected_answer = output_data[self.pairwise_response_index].get("answer", {})
if isinstance(selected_answer, dict):
label_data = selected_answer.get("label", {})
if isinstance(label_data, dict):
- # 对于pairwise,返回偏好信息
+ # For pairwise, return preference information
preference = label_data.get("preference", "")
strength = label_data.get("preference_strength", 0)
response_id = label_data.get("response_id", "")
-
+
return {
"preference": preference,
"preference_strength": strength,
"response_id": response_id,
- "task_type": "pairwise"
+ "task_type": "pairwise",
}
-
+
return ""
except:
return ""
class PointwiseChatRLDataset(BaseChatRLDataset):
- """Pointwise聊天强化学习数据集 - 用于单个回答的质量评分"""
-
+ """Pointwise chat reinforcement learning dataset - for single response quality scoring."""
+
def __init__(self, data_files, tokenizer, config, processor=None, max_samples: int = -1):
super().__init__(data_files, tokenizer, config, processor, max_samples)
- print(f"使用 Pointwise 模式")
+ print("Using Pointwise mode")
def _build_messages(self, example: dict) -> List[dict]:
- """从样本构建聊天消息 - Pointwise模式"""
+ """Build chat messages from example - Pointwise mode."""
messages = []
-
- # 从input字段提取用户消息
+
+ # Extract user message from input field
if "input" in example and example["input"]:
for msg in example["input"]:
if isinstance(msg, dict) and msg.get("role") == "user" and msg.get("content"):
messages.append({"role": "user", "content": msg["content"]})
-
- # Pointwise模式:获取第一个response
+
+ # Pointwise mode: get first response
if "output" in example and example["output"]:
output_item = example["output"][0] if isinstance(example["output"], list) else example["output"]
answer = output_item.get("answer", {}) if isinstance(output_item, dict) else {}
@@ -402,17 +396,17 @@ def _build_messages(self, example: dict) -> List[dict]:
content = answer.get("content", "")
if content:
messages.append({"role": "assistant", "content": content})
-
- # 回退到原始结构
+
+ # Fallback to original structure
if len(messages) <= 1:
prompt = self._extract_prompt(example)
if prompt:
messages = [{"role": "user", "content": prompt}]
-
+
return messages
def _format_template(self, messages: List[dict], example: dict) -> str:
- """格式化pointwise模板"""
+ """Format pointwise template."""
task_desc = """You are a professional expert in response quality evaluation.
You will be provided with a query and a response to that query.
Your task is to evaluate the quality of the response and assign a helpfulness score from 0 to 4.
@@ -426,22 +420,22 @@ def _format_template(self, messages: List[dict], example: dict) -> str:
"Relevance: How directly related the response is to the question",
"Safety: Avoiding harmful or inappropriate content",
]
-
- # 提取问题
- query = next((msg['content'] for msg in messages if msg['role'] == 'user'), '')
-
- # 获取回答
+
+ # Extract query
+ query = next((msg["content"] for msg in messages if msg["role"] == "user"), "")
+
+ # Get response
response = ""
if "output" in example and example["output"]:
output_item = example["output"][0] if isinstance(example["output"], list) else example["output"]
if isinstance(output_item, dict):
response = output_item.get("answer", {}).get("content", "")
-
- # 直接使用字符串格式化
+
+ # Use string formatting directly
principles_str = ""
for i, principle in enumerate(principles):
principles_str += f"{i + 1}. {principle}\n"
-
+
prompt = f"""# Task Description
{task_desc}
# Principles
@@ -456,7 +450,7 @@ def _format_template(self, messages: List[dict], example: dict) -> str:
return [{"role": "user", "content": prompt}]
def _extract_ground_truth(self, row_dict):
- """提取pointwise真实标签"""
+ """Extract pointwise ground truth label."""
try:
output_data = row_dict.get("output", [])
if output_data:
@@ -466,16 +460,13 @@ def _extract_ground_truth(self, row_dict):
if isinstance(answer, dict):
label_data = answer.get("label", {})
if isinstance(label_data, dict):
- # 对于pointwise,返回评分信息
+ # For pointwise, return scoring information
helpfulness = label_data.get("helpfulness", 0)
- return {
- "helpfulness": helpfulness,
- "task_type": "pointwise"
- }
-
+ return {"helpfulness": helpfulness, "task_type": "pointwise"}
+
return {"helpfulness": 0, "task_type": "pointwise"}
except:
return {"helpfulness": 0, "task_type": "pointwise"}
-# 向后兼容的别名
+# Backward compatible aliases
diff --git a/cookbooks/training_judge_model/grpo/pairwise/reward_fn.py b/cookbooks/training_judge_model/grpo/pairwise/reward_fn.py
index a98be20e..1ac348bc 100644
--- a/cookbooks/training_judge_model/grpo/pairwise/reward_fn.py
+++ b/cookbooks/training_judge_model/grpo/pairwise/reward_fn.py
@@ -1,100 +1,93 @@
-import torch
-import json
import re
-from datetime import datetime
-import os
-from collections import defaultdict
def filter_thinking_parts(text):
"""
- 过滤文本中的思考部分(用于Qwen3等支持thinking模式的模型)
-
- 支持的思考标记格式:
+ Filter thinking parts from text (for models like Qwen3 that support thinking mode).
+
+ Supported thinking tag formats:
- ...
"""
if not isinstance(text, str):
return text
-
- # 定义思考部分的正则表达式模式
- thinking_patterns = [
- r'.*?'
- ]
-
- # 依次应用所有模式进行过滤
+
+ # Define regex patterns for thinking parts
+ thinking_patterns = [r".*?"]
+
+ # Apply all patterns sequentially for filtering
filtered_text = text
for pattern in thinking_patterns:
- filtered_text = re.sub(pattern, '', filtered_text, flags=re.DOTALL | re.IGNORECASE)
-
- # 清理多余的空白字符
- filtered_text = re.sub(r'\n\s*\n', '\n\n', filtered_text) # 合并多个换行
+ filtered_text = re.sub(pattern, "", filtered_text, flags=re.DOTALL | re.IGNORECASE)
+
+ # Clean up extra whitespace
+ filtered_text = re.sub(r"\n\s*\n", "\n\n", filtered_text) # Merge multiple newlines
filtered_text = filtered_text.strip()
-
+
return filtered_text
def extract_preference_response(response_text):
"""
- 从模型回复中提取preference偏好
- 从标签中提取偏好选择
+ Extract preference from model response.
+ Extract preference choice from tag.
"""
# Handle case where response_text might not be a string
if not isinstance(response_text, str):
response_text = str(response_text)
-
- # 从标签中提取偏好
- preference_pattern = r'(.*?)'
+
+ # Extract preference from tag
+ preference_pattern = r"(.*?)"
match = re.search(preference_pattern, response_text, re.DOTALL)
-
+
if match:
preference_content = match.group(1).strip().upper()
-
- # 首先检查是否直接是A或B
- if preference_content == 'A':
- return 'A'
- elif preference_content == 'B':
- return 'B'
- elif preference_content == 'TIE':
- return 'tie'
-
- # 然后检查是否包含特定词汇但不是两者都有
- if 'A' in preference_content and 'B' not in preference_content:
- return 'A'
- elif 'B' in preference_content and 'A' not in preference_content:
- return 'B'
- elif 'TIE' in preference_content or ('A' in preference_content and 'B' in preference_content):
- return 'tie'
-
- # 如果没有找到标签,尝试从文本最后部分提取
- lines = response_text.strip().split('\n')
- for line in reversed(lines[-5:]): # 检查最后5行
+
+ # First check if it's directly A or B
+ if preference_content == "A":
+ return "A"
+ elif preference_content == "B":
+ return "B"
+ elif preference_content == "TIE":
+ return "tie"
+
+ # Then check if it contains specific words but not both
+ if "A" in preference_content and "B" not in preference_content:
+ return "A"
+ elif "B" in preference_content and "A" not in preference_content:
+ return "B"
+ elif "TIE" in preference_content or ("A" in preference_content and "B" in preference_content):
+ return "tie"
+
+ # If no tag found, try to extract from the last part of text
+ lines = response_text.strip().split("\n")
+ for line in reversed(lines[-5:]): # Check last 5 lines
line = line.strip().upper()
- if line == 'A' or 'RESPONSE A' in line or 'ANSWER A' in line:
- return 'A'
- elif line == 'B' or 'RESPONSE B' in line or 'ANSWER B' in line:
- return 'B'
- elif 'TIE' in line or 'EQUAL' in line:
- return 'tie'
-
- return 'unknown' # 如果无法提取,返回unknown
+ if line == "A" or "RESPONSE A" in line or "ANSWER A" in line:
+ return "A"
+ elif line == "B" or "RESPONSE B" in line or "ANSWER B" in line:
+ return "B"
+ elif "TIE" in line or "EQUAL" in line:
+ return "tie"
+
+ return "unknown" # Return unknown if extraction fails
def calculate_pairwise_reward(predicted_preference, true_preference, response_id):
"""
- 基于preference预测与真实偏好的匹配程度计算奖励
-
+ Calculate reward based on how well the predicted preference matches the true preference.
+
Args:
- predicted_preference: 模型预测的偏好 ('A', 'B', 'tie', 'unknown')
- true_preference: 真实偏好 ('A', 'B', 'tie')
- response_id: 当前response的ID ('A' 或 'B')
-
+ predicted_preference: Model's predicted preference ('A', 'B', 'tie', 'unknown')
+ true_preference: Ground truth preference ('A', 'B', 'tie')
+ response_id: Current response ID ('A' or 'B')
+
Returns:
- float: 奖励分数 (1.0 如果预测正确,0.0 如果预测错误)
+ float: Reward score (1.0 if prediction is correct, 0.0 if incorrect)
"""
- if true_preference is None or predicted_preference == 'unknown':
+ if true_preference is None or predicted_preference == "unknown":
return 0.0
-
- # 简化奖励逻辑:预测正确给1分,预测错误给0分
+
+ # Simplified reward logic: 1 point for correct prediction, 0 for incorrect
if predicted_preference == true_preference:
return 1.0
else:
@@ -103,79 +96,75 @@ def calculate_pairwise_reward(predicted_preference, true_preference, response_id
def compute_score(data_source, solution_str, ground_truth, extra_info=None, **kwargs):
"""
- 与naive.py兼容的compute_score函数,处理pairwise比较任务
-
- 参数:
- - data_source: 数据源类型
- - solution_str: 模型生成的回复
- - ground_truth: 真实标签(包含偏好信息)
- - extra_info: 额外信息
+ compute_score function compatible with naive.py, handles pairwise comparison tasks.
+
+ Args:
+ data_source: Data source type
+ solution_str: Model generated response
+ ground_truth: Ground truth label (contains preference information)
+ extra_info: Additional information
"""
try:
- # 先过滤掉思考部分(支持Qwen3等模型的thinking模式)
+ # First filter out thinking parts (support thinking mode for models like Qwen3)
filtered_solution = filter_thinking_parts(solution_str)
-
- # 从过滤后的solution_str中提取preference
+
+ # Extract preference from filtered solution_str
predicted_preference = extract_preference_response(filtered_solution)
-
- # 处理ground_truth - 应该包含偏好信息
+
+ # Handle ground_truth - should contain preference information
if isinstance(ground_truth, dict):
- true_preference = ground_truth.get('preference', 'tie')
- response_id = ground_truth.get('response_id', 'A')
- preference_strength = ground_truth.get('preference_strength', 0)
- task_type = ground_truth.get('task_type', 'pairwise')
+ true_preference = ground_truth.get("preference", "tie")
+ response_id = ground_truth.get("response_id", "A")
+ preference_strength = ground_truth.get("preference_strength", 0)
+ task_type = ground_truth.get("task_type", "pairwise")
else:
- # 回退处理
+ # Fallback handling
if extra_info and isinstance(extra_info, dict):
- # 尝试从extra_info中获取偏好信息
- data_mode = extra_info.get('data_mode', 'pointwise')
- if data_mode == 'pairwise':
- # 分析原始数据
- output_data = extra_info.get('output', [])
+ # Try to get preference info from extra_info
+ data_mode = extra_info.get("data_mode", "pointwise")
+ if data_mode == "pairwise":
+ # Analyze original data
+ output_data = extra_info.get("output", [])
if output_data and len(output_data) >= 2:
- # 从原始标签中推断偏好
- label_a = output_data[0].get('answer', {}).get('label', {})
- label_b = output_data[1].get('answer', {}).get('label', {})
-
- pref_a = label_a.get('overall_preference', 0)
- pref_b = label_b.get('overall_preference', 0)
-
+ # Infer preference from original labels
+ label_a = output_data[0].get("answer", {}).get("label", {})
+ label_b = output_data[1].get("answer", {}).get("label", {})
+
+ pref_a = label_a.get("overall_preference", 0)
+ pref_b = label_b.get("overall_preference", 0)
+
if pref_a > pref_b:
- true_preference = 'A'
+ true_preference = "A"
elif pref_b > pref_a:
- true_preference = 'B'
+ true_preference = "B"
else:
- true_preference = 'tie'
-
- # 假设我们在评估第一个response (A)
- response_id = 'A'
+ true_preference = "tie"
+
+ # Assume we're evaluating the first response (A)
+ response_id = "A"
preference_strength = abs(pref_a - pref_b)
- task_type = 'pairwise'
+ task_type = "pairwise"
else:
- true_preference = 'tie'
- response_id = 'A'
+ true_preference = "tie"
+ response_id = "A"
preference_strength = 0
- task_type = 'pairwise'
+ task_type = "pairwise"
else:
- # 不是pairwise任务,返回默认值
- return {
- "score": 0.0,
- "error": "Not a pairwise task",
- "data_source": data_source
- }
+ # Not a pairwise task, return default values
+ return {"score": 0.0, "error": "Not a pairwise task", "data_source": data_source}
else:
- true_preference = 'tie'
- response_id = 'A'
+ true_preference = "tie"
+ response_id = "A"
preference_strength = 0
- task_type = 'pairwise'
-
- # 计算奖励
+ task_type = "pairwise"
+
+ # Calculate reward
reward = calculate_pairwise_reward(predicted_preference, true_preference, response_id)
-
- # 计算准确率
- accuracy = 1.0 if (predicted_preference == true_preference and predicted_preference != 'unknown') else 0.0
- # 返回详细信息
+ # Calculate accuracy
+ accuracy = 1.0 if (predicted_preference == true_preference and predicted_preference != "unknown") else 0.0
+
+ # Return detailed information
return {
"score": reward,
"predicted_preference": predicted_preference,
@@ -184,23 +173,18 @@ def compute_score(data_source, solution_str, ground_truth, extra_info=None, **kw
"response_id": response_id,
"preference_strength": preference_strength,
"task_type": task_type,
- "data_source": data_source
+ "data_source": data_source,
}
-
+
except Exception as e:
print(f"Error in compute_score: {e}")
- # 返回默认值
- return {
- "score": 0.0,
- "accuracy": 0.0,
- "error": str(e),
- "data_source": data_source
- }
+ # Return default values
+ return {"score": 0.0, "accuracy": 0.0, "error": str(e), "data_source": data_source}
if __name__ == "__main__":
- # 测试用例 - 模拟模型的实际输出
- model_response = '''Let me analyze both responses based on the given principles:
+ # Test cases - simulate model's actual output
+ model_response = """Let me analyze both responses based on the given principles:
1. Helpfulness: Response A provides detailed step-by-step instructions including washing, peeling, cutting, soaking, and drying. Response B only mentions cutting and frying, missing crucial preparation steps.
@@ -214,39 +198,29 @@ def compute_score(data_source, solution_str, ground_truth, extra_info=None, **kw
Response A is significantly better as it provides complete, accurate, and helpful instructions for preparing potatoes for frying.
-A'''
-
- # 测试better标签提取
+A"""
+
+ # Test better tag extraction
extracted_pref = extract_preference_response(model_response)
- print(f"提取的偏好: {extracted_pref}")
-
- # 模拟ground_truth数据
- ground_truth = {
- "preference": "A",
- "preference_strength": 2,
- "response_id": "A",
- "task_type": "pairwise"
- }
-
- # 测试reward计算
+ print(f"Extracted preference: {extracted_pref}")
+
+ # Simulate ground_truth data
+ ground_truth = {"preference": "A", "preference_strength": 2, "response_id": "A", "task_type": "pairwise"}
+
+ # Test reward calculation
result = compute_score("helpsteer3", model_response, ground_truth)
print(f"Reward result: {result}")
-
- # 测试不同的预测结果
+
+ # Test different prediction results
test_cases = [
- ("A", "A", "A"), # 正确预测A更好,当前是A
- ("A", "A", "B"), # 正确预测A更好,当前是B
- ("B", "A", "A"), # 错误预测B更好,当前是A
- ("tie", "A", "A"), # 预测tie,真实A更好,当前是A
+ ("A", "A", "A"), # Correct prediction A is better, current is A
+ ("A", "A", "B"), # Correct prediction A is better, current is B
+ ("B", "A", "A"), # Wrong prediction B is better, current is A
+ ("tie", "A", "A"), # Predict tie, true is A better, current is A
]
-
- print("\n=== 测试不同预测结果 ===")
+
+ print("\n=== Testing different prediction results ===")
for pred, true, resp_id in test_cases:
- test_gt = {
- "preference": true,
- "preference_strength": 1,
- "response_id": resp_id,
- "task_type": "pairwise"
- }
+ test_gt = {"preference": true, "preference_strength": 1, "response_id": resp_id, "task_type": "pairwise"}
reward = calculate_pairwise_reward(pred, true, resp_id)
- print(f"预测: {pred}, 真实: {true}, Response ID: {resp_id} -> 奖励: {reward:.1f}")
\ No newline at end of file
+ print(f"Predicted: {pred}, True: {true}, Response ID: {resp_id} -> Reward: {reward:.1f}")
diff --git a/cookbooks/training_judge_model/grpo/pointwise/reward_fn.py b/cookbooks/training_judge_model/grpo/pointwise/reward_fn.py
index 98571486..c0f76b81 100644
--- a/cookbooks/training_judge_model/grpo/pointwise/reward_fn.py
+++ b/cookbooks/training_judge_model/grpo/pointwise/reward_fn.py
@@ -1,168 +1,156 @@
-import torch
-import json
-from datetime import datetime
-import os
import re
-from collections import defaultdict
def filter_thinking_parts(text):
"""
- 过滤文本中的思考部分(用于Qwen3等支持thinking模式的模型)
-
- 支持的思考标记格式:
+ Filter thinking parts from text (for models like Qwen3 that support thinking mode).
+
+ Supported thinking tag formats:
- ...
"""
if not isinstance(text, str):
return text
-
- # 定义思考部分的正则表达式模式
- thinking_patterns = [
- r'.*?'
- ]
-
- # 依次应用所有模式进行过滤
+
+ # Define regex patterns for thinking parts
+ thinking_patterns = [r".*?"]
+
+ # Apply all patterns sequentially for filtering
filtered_text = text
for pattern in thinking_patterns:
- filtered_text = re.sub(pattern, '', filtered_text, flags=re.DOTALL | re.IGNORECASE)
-
- # 清理多余的空白字符
- filtered_text = re.sub(r'\n\s*\n', '\n\n', filtered_text) # 合并多个换行
+ filtered_text = re.sub(pattern, "", filtered_text, flags=re.DOTALL | re.IGNORECASE)
+
+ # Clean up extra whitespace
+ filtered_text = re.sub(r"\n\s*\n", "\n\n", filtered_text) # Merge multiple newlines
filtered_text = filtered_text.strip()
-
+
return filtered_text
def extract_helpfulness_score(response_text):
"""
- 从模型回复中提取helpfulness评分
- 从标签中提取分数
+ Extract helpfulness score from model response.
+ Extract score from tag.
"""
# Handle case where response_text might not be a string
if not isinstance(response_text, str):
response_text = str(response_text)
-
- # 从标签中提取分数
- score_pattern = r'(.*?)'
+
+ # Extract score from tag
+ score_pattern = r"(.*?)"
match = re.search(score_pattern, response_text, re.DOTALL)
-
+
if match:
score_content = match.group(1).strip()
- # 提取其中的数字
- numbers = re.findall(r'\d+', score_content)
+ # Extract numbers from content
+ numbers = re.findall(r"\d+", score_content)
if numbers:
try:
- score = int(numbers[0]) # 取第一个数字作为分数
- if 0 <= score <= 4: # 假设分数范围是0-4
+ score = int(numbers[0]) # Take the first number as score
+ if 0 <= score <= 4: # Assume score range is 0-4
return score
except:
pass
-
- return 0 # 如果无法提取,默认为0
+
+ return 0 # Default to 0 if extraction fails
+
def calculate_helpfulness_reward(predicted_score, true_score):
"""
- 基于helpfulness预测分数与真实分数的差异计算奖励
- 差异越小,奖励越高
-
- 对于二分类场景 (true_score为0或1):
- - 预测正确(完全匹配)→ 奖励1.0
- - 预测错误 → 奖励0.0
+ Calculate reward based on the difference between predicted and true helpfulness scores.
+ Smaller difference results in higher reward.
+
+ For binary classification scenarios (true_score is 0 or 1):
+ - Correct prediction (exact match) -> Reward 1.0
+ - Wrong prediction -> Reward 0.0
"""
if true_score is None:
return 0.0
-
- # 计算差异
+
+ # Calculate difference
diff = abs(predicted_score - true_score)
-
- # 对于二分类场景(0或1),采用精确匹配
+
+ # For binary classification (0 or 1), use exact match
if true_score in [0, 1]:
return 1.0 if diff == 0 else 0.0
-
- # 对于多分类场景(0-4),采用差异计算
- # 将差异转换为奖励分数 (差异越小奖励越高)
+
+ # For multi-class scenarios (0-4), use difference calculation
+ # Convert difference to reward score (smaller difference = higher reward)
max_possible_diff = 4
normalized_diff = min(diff / max_possible_diff, 1.0)
-
- # 奖励 = 1 - 标准化差异
+
+ # Reward = 1 - normalized difference
reward = 1.0 - normalized_diff
-
+
return reward
+
def compute_score(data_source, solution_str, ground_truth, extra_info=None, **kwargs):
"""
- 与naive.py兼容的compute_score函数
- 参数:
- - data_source: 数据源类型
- - solution_str: 模型生成的回复
- - ground_truth: 真实标签(从reward_model字段获取)
- - extra_info: 额外信息
+ compute_score function compatible with naive.py.
+
+ Args:
+ data_source: Data source type
+ solution_str: Model generated response
+ ground_truth: Ground truth label (obtained from reward_model field)
+ extra_info: Additional information
"""
try:
- # 先过滤掉思考部分(支持Qwen3等模型的thinking模式)
+ # First filter out thinking parts (support thinking mode for models like Qwen3)
filtered_solution = filter_thinking_parts(solution_str)
-
- # 从过滤后的solution_str中提取helpfulness分数
+
+ # Extract helpfulness score from filtered solution_str
predicted_helpfulness = extract_helpfulness_score(filtered_solution)
-
- # 处理ground_truth - 可能是数字或者字典
+
+ # Handle ground_truth - could be a number or dict
if isinstance(ground_truth, dict):
- true_helpfulness = ground_truth.get('helpfulness', 0)
+ true_helpfulness = ground_truth.get("helpfulness", 0)
elif isinstance(ground_truth, (int, float)):
true_helpfulness = int(ground_truth)
elif isinstance(ground_truth, str) and ground_truth.isdigit():
true_helpfulness = int(ground_truth)
else:
- # 如果ground_truth不可用,尝试从extra_info中获取
+ # If ground_truth is unavailable, try to get from extra_info
if extra_info and isinstance(extra_info, dict):
- output_data = extra_info.get('output', [])
+ output_data = extra_info.get("output", [])
if output_data and len(output_data) > 0:
- label_data = output_data[0].get('label', {})
- true_helpfulness = label_data.get('helpfulness', 0)
+ label_data = output_data[0].get("label", {})
+ true_helpfulness = label_data.get("helpfulness", 0)
else:
true_helpfulness = 0
else:
true_helpfulness = 0
-
- # 计算奖励
+
+ # Calculate reward
reward = calculate_helpfulness_reward(predicted_helpfulness, true_helpfulness)
-
- # 返回详细信息
+
+ # Return detailed information
return {
"score": reward,
"predicted_helpfulness": predicted_helpfulness,
"true_helpfulness": true_helpfulness,
- "data_source": data_source
+ "data_source": data_source,
}
-
+
except Exception as e:
print(f"Error in compute_score: {e}")
- # 返回默认值
- return {
- "score": 0.0,
- "error": str(e),
- "data_source": data_source
- }
+ # Return default values
+ return {"score": 0.0, "error": str(e), "data_source": data_source}
+
if __name__ == "__main__":
- # 测试用例
- test_response = '''Let me analyze this answer step by step:
+ # Test cases
+ test_response = """Let me analyze this answer step by step:
1. First, I'll check if the answer is well-structured...
4. Finally, I'll look at the overall helpfulness...
-2'''
-
+2"""
+
ground_truth = {"helpfulness": 3, "task_type": "pointwise"}
-
- # 测试 compute_score 函数
- result = compute_score(
- data_source="test",
- solution_str=test_response,
- ground_truth=ground_truth
- )
-
- print(f"Test Result:")
+
+ # Test compute_score function
+ result = compute_score(data_source="test", solution_str=test_response, ground_truth=ground_truth)
+
+ print("Test Result:")
print(f" Predicted Score: {result.get('predicted_helpfulness')}")
print(f" True Score: {result.get('true_helpfulness')}")
print(f" Reward: {result.get('score')}")
-
diff --git a/cookbooks/zero_shot_evaluation/report_generator.py b/cookbooks/zero_shot_evaluation/report_generator.py
new file mode 100644
index 00000000..d6f0c059
--- /dev/null
+++ b/cookbooks/zero_shot_evaluation/report_generator.py
@@ -0,0 +1,277 @@
+# -*- coding: utf-8 -*-
+"""Report generator for zero-shot evaluation results."""
+
+import asyncio
+from typing import List
+
+from cookbooks.zero_shot_evaluation.schema import (
+ ComparisonDetail,
+ OpenAIEndpoint,
+ TaskConfig,
+)
+from cookbooks.zero_shot_evaluation.zero_shot_pipeline import EvaluationResult
+from openjudge.models.openai_chat_model import OpenAIChatModel
+
+# Constants for report generation
+_NUM_WINNING_EXAMPLES_FOR_RANKING = 2
+_NUM_LOSING_EXAMPLES_FOR_RANKING = 1
+_NUM_SAMPLE_REASONS_PER_MODEL = 3
+
+
+class ReportGenerator:
+ """Generate evaluation report with parallel LLM calls."""
+
+ def __init__(
+ self,
+ judge_endpoint: OpenAIEndpoint,
+ language: str = "zh",
+ include_examples: int = 3,
+ ):
+ self.language = language
+ self.include_examples = include_examples
+ extra_params = judge_endpoint.extra_params or {}
+ self.model = OpenAIChatModel(
+ model=judge_endpoint.model,
+ api_key=judge_endpoint.api_key,
+ base_url=judge_endpoint.base_url,
+ temperature=extra_params.get("temperature", 0.3),
+ )
+
+ async def generate(
+ self,
+ task_config: TaskConfig,
+ rubrics: List[str],
+ result: EvaluationResult,
+ details: List[ComparisonDetail],
+ ) -> str:
+ """Generate complete report with parallel section generation."""
+ # Prepare context
+ ctx = self._prepare_context(task_config, rubrics, result, details)
+
+ # Generate sections in parallel
+ sections = await asyncio.gather(
+ self._gen_summary(ctx),
+ self._gen_ranking_explanation(ctx),
+ self._gen_model_analysis(ctx),
+ self._gen_examples(ctx),
+ )
+
+ # Assemble report
+ lang_title = "评估报告" if self.language == "zh" else "Evaluation Report"
+ header = f"# {lang_title}\n\n"
+ return header + "\n\n---\n\n".join(s for s in sections if s)
+
+ def _prepare_context(
+ self,
+ task_config: TaskConfig,
+ rubrics: List[str],
+ result: EvaluationResult,
+ details: List[ComparisonDetail],
+ ) -> dict:
+ """Prepare shared context for all sections."""
+ # Filter to only original order (remove swapped duplicates)
+ original_details = [d for d in details if d.order == "original"]
+
+ # Format rankings
+ rankings_text = "\n".join(f"{i+1}. {name}: {rate:.1%}" for i, (name, rate) in enumerate(result.rankings))
+
+ # Format rubrics
+ rubrics_text = "\n".join(f"- {r}" for r in rubrics)
+
+ # Group details by model pair for examples
+ model_examples = {}
+ for d in original_details:
+ key = tuple(sorted([d.model_a, d.model_b]))
+ if key not in model_examples:
+ model_examples[key] = []
+ model_examples[key].append(d)
+
+ # Select representative examples (prefer those with detailed reasons)
+ selected_examples = []
+ for pair_details in model_examples.values():
+ sorted_details = sorted(pair_details, key=lambda x: len(x.reason), reverse=True)
+ selected_examples.extend(sorted_details[: self.include_examples])
+
+ return {
+ "task_description": task_config.description,
+ "scenario": task_config.scenario or "",
+ "rubrics": rubrics_text,
+ "rankings": rankings_text,
+ "win_matrix": result.win_matrix,
+ "total_queries": result.total_queries,
+ "total_comparisons": result.total_comparisons,
+ "best_model": result.best_pipeline,
+ "model_names": [name for name, _ in result.rankings],
+ "examples": selected_examples[: self.include_examples * 3],
+ "all_details": original_details, # Use deduplicated details
+ }
+
+ async def _call_llm(self, prompt: str) -> str:
+ """Call LLM with given prompt."""
+ lang_instruction = "Output in Chinese (中文)." if self.language == "zh" else "Output in English."
+ messages = [
+ {"role": "system", "content": f"You are an expert AI evaluation analyst. {lang_instruction}"},
+ {"role": "user", "content": prompt},
+ ]
+ response = await self.model.achat(messages=messages)
+ return response.content or ""
+
+ async def _gen_summary(self, ctx: dict) -> str:
+ """Generate executive summary."""
+ prompt = f"""Generate a concise executive summary for an AI model evaluation.
+
+Task: {ctx['task_description']}
+Scenario: {ctx['scenario']}
+
+Evaluation Statistics:
+- Total test queries: {ctx['total_queries']}
+- Total pairwise comparisons: {ctx['total_comparisons']}
+
+Final Rankings:
+{ctx['rankings']}
+
+Best performing model: {ctx['best_model']}
+
+Requirements:
+- Write 150-200 words
+- Include: evaluation purpose, methodology summary, key findings, winner
+- Use professional tone"""
+
+ content = await self._call_llm(prompt)
+ title = "## 执行摘要" if self.language == "zh" else "## Executive Summary"
+ return f"{title}\n\n{content}"
+
+ async def _gen_ranking_explanation(self, ctx: dict) -> str:
+ """Generate ranking explanation with evidence."""
+ # Find key examples showing why top model won/lost
+ best = ctx["best_model"]
+
+ # Best model wins: either (model_a=best and winner=model_a) or (model_b=best and winner=model_b)
+ winning_examples = [
+ d
+ for d in ctx["all_details"]
+ if (d.model_a == best and d.winner == "model_a") or (d.model_b == best and d.winner == "model_b")
+ ][:_NUM_WINNING_EXAMPLES_FOR_RANKING]
+
+ # Best model loses: either (model_a=best and winner=model_b) or (model_b=best and winner=model_a)
+ losing_examples = [
+ d
+ for d in ctx["all_details"]
+ if (d.model_a == best and d.winner == "model_b") or (d.model_b == best and d.winner == "model_a")
+ ][:_NUM_LOSING_EXAMPLES_FOR_RANKING]
+
+ examples_text = ""
+ for i, ex in enumerate(winning_examples + losing_examples, 1):
+ actual_winner = ex.model_a if ex.winner == "model_a" else ex.model_b
+ examples_text += f"""
+Example {i}:
+- Query: {ex.query[:200]}...
+- Winner: {actual_winner}
+- Reason: {ex.reason}
+"""
+
+ prompt = f"""Explain why the models are ranked this way based on the evaluation.
+
+Rankings:
+{ctx['rankings']}
+
+Evaluation Criteria:
+{ctx['rubrics']}
+
+Win Matrix (row beats column with this rate):
+{self._format_win_matrix(ctx['win_matrix'])}
+
+Key Examples:
+{examples_text}
+
+Requirements:
+- Explain why {ctx['best_model']} ranks first
+- Highlight key differences between top models
+- Reference specific evidence from examples
+- Be objective and balanced"""
+
+ content = await self._call_llm(prompt)
+ title = "## 排名解释" if self.language == "zh" else "## Ranking Explanation"
+ return f"{title}\n\n{content}"
+
+ async def _gen_model_analysis(self, ctx: dict) -> str:
+ """Generate per-model analysis."""
+ # Collect stats for each model
+ model_stats = {name: {"wins": 0, "losses": 0, "reasons": []} for name in ctx["model_names"]}
+
+ for d in ctx["all_details"]:
+ winner = d.model_a if d.winner == "model_a" else d.model_b
+ loser = d.model_b if d.winner == "model_a" else d.model_a
+ model_stats[winner]["wins"] += 1
+ model_stats[loser]["losses"] += 1
+ if d.reason:
+ model_stats[winner]["reasons"].append(f"[Win] {d.reason[:150]}")
+ model_stats[loser]["reasons"].append(f"[Loss] {d.reason[:150]}")
+
+ stats_text = ""
+ for name in ctx["model_names"]:
+ stats = model_stats[name]
+ sample_reasons = stats["reasons"][:_NUM_SAMPLE_REASONS_PER_MODEL]
+ reasons_text = "\n".join(" * " + r for r in sample_reasons)
+ stats_text += f"""
+Model: {name}
+- Wins: {stats['wins']}, Losses: {stats['losses']}
+- Sample evaluation reasons:
+{reasons_text}
+"""
+
+ prompt = f"""Analyze each model's performance in this evaluation.
+
+Task: {ctx['task_description']}
+
+Evaluation Criteria:
+{ctx['rubrics']}
+
+Model Statistics:
+{stats_text}
+
+Requirements:
+For each model, provide:
+1. Overall assessment (2-3 sentences)
+2. Key strengths (with evidence)
+3. Key weaknesses (with evidence)
+4. Improvement suggestions"""
+
+ content = await self._call_llm(prompt)
+ title = "## 模型分析" if self.language == "zh" else "## Model Analysis"
+ return f"{title}\n\n{content}"
+
+ async def _gen_examples(self, ctx: dict) -> str:
+ """Generate showcase examples."""
+ examples = ctx["examples"][: self.include_examples]
+ if not examples:
+ return ""
+
+ examples_text = ""
+ for i, ex in enumerate(examples, 1):
+ examples_text += f"""
+### Case {i}
+
+**Query:** {ex.query}
+
+**{ex.model_a}:**
+{ex.response_a[:500]}{'...' if len(ex.response_a) > 500 else ''}
+
+**{ex.model_b}:**
+{ex.response_b[:500]}{'...' if len(ex.response_b) > 500 else ''}
+
+**Winner:** {ex.model_a if ex.winner == 'model_a' else ex.model_b}
+
+**Evaluation Reason:** {ex.reason}
+"""
+
+ title = "## 典型案例" if self.language == "zh" else "## Representative Cases"
+ return f"{title}\n{examples_text}"
+
+ def _format_win_matrix(self, win_matrix: dict) -> str:
+ """Format win matrix for display."""
+ lines = []
+ for model_a, opponents in win_matrix.items():
+ for model_b, rate in opponents.items():
+ lines.append(f" {model_a} vs {model_b}: {rate:.1%}")
+ return "\n".join(lines)