1
1
from typing import Tuple , Optional , List , Dict
2
2
from easydict import EasyDict
3
3
from torch .utils .tensorboard import SummaryWriter
4
- from transformers import AutoTokenizer
4
+ from transformers import AutoTokenizer , AutoModel
5
+ import torch
6
+ import torch .nn .functional as F
5
7
import re
6
8
7
9
from ding .utils import REWARD_MODEL_REGISTRY
@@ -13,8 +15,8 @@ class MathRewardModel(BaseRewardModel):
13
15
config = dict (
14
16
# (str) The type of the reward model.
15
17
type = 'math' ,
16
- # (str) The name of the tokenizer, usually the huggingface tokenizer name.
17
- tokenizer_name = 'Qwen/Qwen2.5-Math-PRM-7B' ,
18
+ # (str) The name of the tokenizer and model
19
+ model_name = 'Qwen/Qwen2.5-Math-PRM-7B' ,
18
20
)
19
21
20
22
def __init__ (self , config : EasyDict , device : str , logger , tb_logger : 'SummaryWriter' ) -> None : # noqa
@@ -23,23 +25,127 @@ def __init__(self, config: EasyDict, device: str, logger, tb_logger: 'SummaryWri
23
25
self .logger = logger
24
26
self .tb_logger = tb_logger
25
27
26
- def estimate (self , data : List [str ]) -> List [Dict ]:
28
+ # 初始化tokenizer和model
29
+ self .tokenizer = AutoTokenizer .from_pretrained (self .cfg .model_name , trust_remote_code = True )
30
+ self .model = AutoModel .from_pretrained (
31
+ self .cfg .model_name , device_map = self .device , torch_dtype = torch .bfloat16 , trust_remote_code = True
32
+ )
33
+ self .model .eval ()
34
+
35
+ def make_step_rewards (self , logits : torch .Tensor , token_masks : torch .Tensor ) -> List [List [float ]]:
36
+ """Calculate step-wise rewards from model outputs"""
37
+ probabilities = F .softmax (logits , dim = - 1 )
38
+ probabilities = probabilities * token_masks .unsqueeze (- 1 ) # bs, seq_len, num_labels
39
+
40
+ all_scores_res = []
41
+ for i in range (probabilities .size (0 )):
42
+ sample = probabilities [i ] # seq_len, num_labels
43
+ positive_probs = sample [sample != 0 ].view (- 1 , 2 )[:, 1 ] # valid_tokens, num_labels
44
+ non_zero_elements_list = positive_probs .cpu ().tolist ()
45
+ all_scores_res .append (non_zero_elements_list )
46
+ return all_scores_res
47
+
48
+ def estimate (self , data : List [Dict ]) -> List [Dict ]:
27
49
"""
50
+ Overview:
51
+ Estimate rewards for mathematical reasoning steps using Qwen2.5-Math-PRM-7B model.
28
52
Arguments:
29
- - data (:obj:`List[str]`): The list of data queries used for estimation, each query is a string.
30
- of the \
31
- form "1 + 1 = ?"
53
+ - data (:obj:`List[Dict]`): List of dictionaries containing:
54
+ - system (:obj:`str`): System prompt for the model
55
+ - query (:obj:`str`): The mathematical query to be evaluated
56
+ - response (:obj:`List[str]`): List of reasoning steps
32
57
Returns:
33
- - reward (:obj:`List[Dict]`): The estimated reward.
58
+ - reward (:obj:`List[Dict]`): List of dictionaries containing:
59
+ - reward (:obj:`float`): Final reward (last step reward)
60
+ - metadata (:obj:`Dict`): Additional information including:
61
+ - query (:obj:`str`): Original query
62
+ - step_rewards (:obj:`List[float]`): Rewards for each reasoning step
63
+ - num_steps (:obj:`int`): Number of reasoning steps
64
+ Shapes:
65
+ - input_ids (:obj:`torch.LongTensor`): :math:`(B, L)`, where B is batch size and L is sequence length
66
+ - outputs (:obj:`torch.FloatTensor`): :math:`(B, L, H)`, where H is hidden size
67
+ - token_masks (:obj:`torch.BoolTensor`): :math:`(B, L)`
68
+ - step_rewards (:obj:`List[List[float]]`): List of length B, each containing S rewards where S is num steps
69
+ Examples:
70
+ >>> data = [{
71
+ >>> "system": "Please reason step by step...",
72
+ >>> "query": "What is 1 + 1?",
73
+ >>> "response": ["First, we have 1", "Then add 1", "Therefore, 1 + 1 = 2"]
74
+ >>> }]
75
+ >>> results = model.estimate(data)
76
+ >>> print(results[0]["reward"]) # 1.0
77
+ >>> print(results[0]["metadata"]["step_rewards"]) # [0.8, 0.9, 1.0]
34
78
"""
35
- pass
79
+ # 批量处理所有样本
80
+ all_messages = []
81
+ for item in data :
82
+ messages = [
83
+ {
84
+ "role" : "system" ,
85
+ "content" : item ['system' ]
86
+ },
87
+ {
88
+ "role" : "user" ,
89
+ "content" : item ['query' ]
90
+ },
91
+ {
92
+ "role" : "assistant" ,
93
+ "content" : "<extra_0>" .join (item ['response' ]) + "<extra_0>"
94
+ },
95
+ ]
96
+ all_messages .append (messages )
97
+
98
+ # 批量转换为模型输入格式
99
+ conversation_strs = [
100
+ self .tokenizer .apply_chat_template (messages , tokenize = False , add_generation_prompt = False )
101
+ for messages in all_messages
102
+ ]
103
+
104
+ # 批量编码输入
105
+ input_ids = self .tokenizer (
106
+ conversation_strs , return_tensors = "pt" , padding = True , truncation = True
107
+ )["input_ids" ].to (self .model .device )
108
+
109
+ # 批量获取模型输出
110
+ with torch .no_grad ():
111
+ outputs = self .model (input_ids = input_ids )
112
+
113
+ # 计算每个样本的步骤奖励
114
+ step_sep_id = self .tokenizer .encode ("<extra_0>" )[0 ]
115
+ token_masks = (input_ids == step_sep_id )
116
+ batch_rewards = self .make_step_rewards (outputs [0 ], token_masks )
117
+
118
+ # 构建详细的结果字典
119
+ results = []
120
+ for item , step_rewards in zip (data , batch_rewards ):
121
+ results .append (
122
+ {
123
+ "reward" : step_rewards [- 1 ] if step_rewards else 0.0 , # 最后一步的奖励作为总体奖励
124
+ "metadata" : {
125
+ "query" : item ['query' ],
126
+ "step_rewards" : step_rewards , # 每个步骤的奖励
127
+ "num_steps" : len (item ['response' ]),
128
+ }
129
+ }
130
+ )
131
+
132
+ return results
36
133
37
- # rule-based reward model does not need training, thus the following methods are empty
38
134
def train (self ):
135
+ """
136
+ Training is not implemented for this reward model as it uses a pre-trained model
137
+ """
138
+ self .logger .warning ("Training is not implemented for this reward model" )
39
139
pass
40
140
41
141
def collect_data (self , data : list ) -> None :
142
+ """
143
+ Data collection is not needed for this reward model
144
+ """
42
145
pass
43
146
44
147
def clear_data (self ) -> None :
148
+ """
149
+ Data clearing is not needed for this reward model
150
+ """
45
151
pass
0 commit comments