Skip to content

Commit 8d52441

Browse files
pre-commit-ci[bot]YeAnbang
authored andcommitted
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
1 parent a246bf2 commit 8d52441

File tree

2 files changed

+51
-39
lines changed

2 files changed

+51
-39
lines changed

applications/ColossalChat/coati/distributed/consumer.py

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -117,36 +117,47 @@ def loop(self) -> None:
117117
# receive data from producers
118118
for r in range(self.num_producers):
119119
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
120-
raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}")
120+
raw_batch = ray_broadcast_tensor_dict(
121+
None, src=0, device=self.device, group_name=f"sync_data_{r}"
122+
)
121123
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
122124
# we need to calculate the metrics before filtering here for logging
123125
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
124-
raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()})
125-
raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()}
126-
# [batch_size, num_generations] -> [batch_size]
127-
group_reward_mean = raw_batch_with_reward["reward"][:,:,0].mean(dim=-1)
128-
group_format_acc_mean = raw_batch_with_reward["format_acc"][:,:,0].mean(dim=-1)
129-
group_ans_acc_mean = raw_batch_with_reward["ans_acc"][:,:,0].mean(dim=-1)
130-
group_response_len = (
131-
(raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1)
132-
.type(torch.float32)
133-
.mean(dim=-1)
126+
raw_batch_with_reward = self.calculate_reward(
127+
{k: v.view(-1, v.size(-1)) if k != "temperature" else v for k, v in raw_batch.items()}
134128
)
129+
raw_batch_with_reward = {
130+
k: v.view(-1, self.num_generations, v.size(-1)) if k != "temperature" else v
131+
for k, v in raw_batch_with_reward.items()
132+
}
133+
# [batch_size, num_generations] -> [batch_size]
134+
reward = raw_batch_with_reward["reward"][:, :, 0]
135+
format_acc = raw_batch_with_reward["format_acc"][:, :, 0]
136+
ans_acc = raw_batch_with_reward["ans_acc"][:, :, 0]
137+
response_len = (
138+
raw_batch_with_reward["response_idx"][:, :, 1]
139+
- raw_batch_with_reward["response_idx"][:, :, 0]
140+
+ 1
141+
).type(torch.float32)
135142
effective_group_mask = None
136143
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
137144
# filter the group based on the reward and accuracy
138145
effective_group_mask = torch.logical_and(
139146
group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1]
140147
)
141-
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
148+
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
142149
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
143150
self.buffer.append(
144151
[
145-
group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None,
146-
group_reward_mean[group_idx],
147-
group_format_acc_mean[group_idx],
148-
group_ans_acc_mean[group_idx],
149-
group_response_len[group_idx],
152+
(
153+
group_with_reward
154+
if effective_group_mask is None or effective_group_mask[group_idx]
155+
else None
156+
),
157+
reward[group_idx],
158+
format_acc[group_idx],
159+
ans_acc[group_idx],
160+
response_len[group_idx],
150161
]
151162
)
152163
if effective_group_mask is not None:
@@ -160,7 +171,9 @@ def loop(self) -> None:
160171
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
161172
buffer_idx
162173
)
163-
pbar.set_postfix({"Collect Effective Prompt": f"{len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"})
174+
print(
175+
f"[T{dist.get_rank()}] Collect Effective Prompt: {len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"
176+
)
164177

165178
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
166179
# on each dp_rank, we use minibatch_size effective samples to form a batch

applications/ColossalChat/coati/distributed/grpo_consumer.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,9 @@ def __init__(
8484
self.project_name = project_name
8585
self.effective_sample_count = 0
8686
self.effective_prompt_count = 0
87-
<<<<<<< HEAD
88-
=======
8987
self.total_sample_count = 0
9088
self.overlength_samples = 0
9189
self.total_overlength_samples = 0
92-
>>>>>>> c8b368c2 (add overlength sample count (#6332))
9390
self.project_name = project_name
9491
self.run_name = run_name
9592
self.wandb_group_name = wandb_group_name
@@ -218,10 +215,18 @@ def step(self, step_idx: int, pbar: Any, **kwargs) -> Optional[float]:
218215
loss_mask,
219216
action_mask[:, -1] == False,
220217
)
221-
222-
self.overlength_samples = (old_loss_mask & ~loss_mask).sum().item()
223-
self.overlength_samples = all_reduce_sum(
224-
torch.tensor(self.overlength_samples, device=loss_mask.device), self.plugin
218+
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", False) == False:
219+
# filter out samples with reward outside the range
220+
# if dynamic batching is enabled, we filter out out of range groups before training
221+
group_ans_acc_mean = (
222+
ans_acc.view(-1, self.num_generations).mean(dim=1).repeat_interleave(self.num_generations, dim=-1)
223+
)
224+
loss_mask = torch.logical_and(
225+
loss_mask,
226+
torch.logical_and(
227+
group_ans_acc_mean > self.filter_range[0],
228+
group_ans_acc_mean < self.filter_range[1],
229+
),
225230
)
226231
self.total_overlength_samples += self.overlength_samples.item()
227232

@@ -448,33 +453,27 @@ def _criterion(outputs, inputs):
448453
self.optimizer.step()
449454
self.optimizer.zero_grad()
450455
self.global_step += 1
451-
<<<<<<< HEAD
452-
sample_utilization = self.effective_sample_count / len(self.raw_train_batch_reward) / self.num_generations
453-
self.effective_prompt_count = 0
454-
self.effective_sample_count = 0
455-
=======
456456
sample_utilization = self.effective_sample_count / self.total_sample_count
457457
overlength_samples_percentage = self.total_overlength_samples / self.total_sample_count
458458
self.effective_prompt_count = 0
459459
self.effective_sample_count = 0
460460
self.total_sample_count = 0
461461
self.total_overlength_samples = 0
462-
>>>>>>> c8b368c2 (add overlength sample count (#6332))
463462
loss_scalar = self.accum_loss.item()
464463
if not self.plugin.pp_size > 1 or (
465464
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0
466465
):
467466
if (not self.plugin.pp_size > 1 and self.rank == 0) or (
468467
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0
469468
):
470-
raw_batch_reward_mean = sum(self.raw_train_batch_reward) / len(self.raw_train_batch_reward)
471-
raw_batch_format_acc_mean = sum(self.raw_train_batch_format_acc) / len(
472-
self.raw_train_batch_format_acc
473-
)
474-
raw_batch_ans_acc_mean = sum(self.raw_train_batch_ans_acc) / len(self.raw_train_batch_ans_acc)
475-
raw_batch_response_len_mean = sum(self.raw_train_batch_response_len) / len(
476-
self.raw_train_batch_response_len
477-
)
469+
raw_batch_reward_mean = torch.cat(self.raw_train_batch_reward, dim=0).mean().cpu().item()
470+
raw_batch_format_acc_mean = torch.cat(self.raw_train_batch_format_acc, dim=0).mean().cpu().item()
471+
raw_batch_ans_acc_mean = torch.cat(self.raw_train_batch_ans_acc, dim=0).mean().cpu().item()
472+
raw_batch_response_len = torch.cat(self.raw_train_batch_response_len, dim=0)
473+
raw_batch_response_len_mean = raw_batch_response_len.mean().cpu().item()
474+
overlength_samples_ratio = (
475+
(raw_batch_response_len >= action_mask.size(-1)).to(float).mean().cpu().item()
476+
) # not an exact figure, but a close estimate
478477
self.raw_train_batch_reward = []
479478
self.raw_train_batch_format_acc = []
480479
self.raw_train_batch_ans_acc = []

0 commit comments

Comments
 (0)