Skip to content

Commit ee939d9

Browse files
committed
address conversation
1 parent 58f8c9b commit ee939d9

File tree

3 files changed

+36
-68
lines changed

3 files changed

+36
-68
lines changed

applications/ColossalChat/coati/distributed/consumer.py

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -118,48 +118,49 @@ def loop(self) -> None:
118118
for r in range(self.num_producers):
119119
print(f"[T{dist.get_rank()}] Recv data episode {episode} step {step} from {r}")
120120
raw_batch = ray_broadcast_tensor_dict(None, src=0, device=self.device, group_name=f"sync_data_{r}")
121-
recv_effective_count = 0
122121
# calculate group reward et al. filtering. As only the filtered group will be used for training (which is incomplete),
123122
# we need to calculate the metrics before filtering here for logging
124-
raw_batch_with_reward = unbind_batch(self.calculate_reward(raw_batch))
125-
for group_with_reward in raw_batch_with_reward:
126-
group_reward_mean = group_with_reward["reward"].mean().cpu().item()
127-
group_format_acc_mean = group_with_reward["format_acc"].mean().cpu().item()
128-
group_ans_acc_mean = group_with_reward["ans_acc"].mean().cpu().item()
129-
group_response_len = (
130-
(
131-
group_with_reward["response_idx"][:, 1]
132-
- group_with_reward["response_idx"][:, 0]
133-
+ 1
134-
)
135-
.type(torch.float32)
136-
.mean()
137-
.cpu()
138-
.item()
123+
# [batch_size, num_generations, ...] -> [batch_size * num_generations, ...]
124+
raw_batch_with_reward = self.calculate_reward({k:v.view(-1, v.size(-1)) if k!='temperature' else v for k, v in raw_batch.items()})
125+
raw_batch_with_reward = {k: v.view(-1, self.num_generations, v.size(-1)) if k!='temperature' else v for k, v in raw_batch_with_reward.items()}
126+
# [batch_size, num_generations] -> [batch_size]
127+
group_reward_mean = raw_batch_with_reward["reward"][:,:,0].mean(dim=-1)
128+
group_format_acc_mean = raw_batch_with_reward["format_acc"][:,:,0].mean(dim=-1)
129+
group_ans_acc_mean = raw_batch_with_reward["ans_acc"][:,:,0].mean(dim=-1)
130+
group_response_len = (
131+
(raw_batch_with_reward["response_idx"][:, :, 1] - raw_batch_with_reward["response_idx"][:, :, 0] + 1)
132+
.type(torch.float32)
133+
.mean(dim=-1)
134+
)
135+
effective_group_mask = None
136+
if self.filter_range is not None and self.grpo_config.get("dynamic_batching", True):
137+
# filter the group based on the reward and accuracy
138+
effective_group_mask = torch.logical_and(
139+
group_ans_acc_mean > self.filter_range[0], group_ans_acc_mean < self.filter_range[1]
139140
)
140-
if self.grpo_config.get("dynamic_batching", True):
141-
filtered_group = self.prompt_level_filtering(group_with_reward)
142-
recv_effective_count += 1 if filtered_group is not None else 0
141+
raw_batch_with_reward = unbind_batch(raw_batch_with_reward) # List[Dict[str, torch.Tensor]]
142+
for group_idx, group_with_reward in enumerate(raw_batch_with_reward):
143143
self.buffer.append(
144144
[
145-
filtered_group,
146-
group_reward_mean,
147-
group_format_acc_mean,
148-
group_ans_acc_mean,
149-
group_response_len,
145+
group_with_reward if effective_group_mask is None or effective_group_mask[group_idx] else None,
146+
group_reward_mean[group_idx],
147+
group_format_acc_mean[group_idx],
148+
group_ans_acc_mean[group_idx],
149+
group_response_len[group_idx],
150150
]
151151
)
152-
if self.filter_range is not None:
152+
if effective_group_mask is not None:
153153
print(
154-
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch)} -> {recv_effective_count}"
154+
f"[T{dist.get_rank()}] Filter recv data: {len(raw_batch)} -> {torch.sum(effective_group_mask).cpu().item()} effective groups"
155155
)
156-
# mapping the effective group to the raw group for indexing
157-
effective_group_to_raw_group_mapping = {}
158-
for buffer_idx in range(len(self.buffer)):
159-
if self.buffer[buffer_idx][0] is not None:
160-
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
161-
buffer_idx
162-
)
156+
# mapping the effective group to the raw group for indexing
157+
effective_group_to_raw_group_mapping = {}
158+
for buffer_idx in range(len(self.buffer)):
159+
if self.buffer[buffer_idx][0] is not None:
160+
effective_group_to_raw_group_mapping[len(effective_group_to_raw_group_mapping)] = (
161+
buffer_idx
162+
)
163+
pbar.set_postfix({"Collect Effective Prompt": f"{len(effective_group_to_raw_group_mapping)}/{self.dp_size * self.minibatch_size}"})
163164

164165
while len(effective_group_to_raw_group_mapping) >= self.dp_size * self.minibatch_size:
165166
# on each dp_rank, we use minibatch_size effective samples to form a batch

applications/ColossalChat/coati/distributed/grpo_consumer.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ def __init__(
8484
self.project_name = project_name
8585
self.effective_sample_count = 0
8686
self.effective_prompt_count = 0
87-
self.total_sample_count = 0
8887
self.project_name = project_name
8988
self.run_name = run_name
9089
self.wandb_group_name = wandb_group_name
@@ -429,11 +428,9 @@ def _criterion(outputs, inputs):
429428
self.optimizer.step()
430429
self.optimizer.zero_grad()
431430
self.global_step += 1
432-
# no need to run all_reduce_sum on total_sample_count, because all dp ranks recieves a complete inference batch from all producers.
433-
sample_utilization = self.effective_sample_count / self.total_sample_count
431+
sample_utilization = self.effective_sample_count / len(self.raw_train_batch_reward) / self.num_generations
434432
self.effective_prompt_count = 0
435433
self.effective_sample_count = 0
436-
self.total_sample_count = 0
437434
loss_scalar = self.accum_loss.item()
438435
if not self.plugin.pp_size > 1 or (
439436
self.plugin.pp_size > 1 and self.booster.plugin.stage_manager.is_last_stage() and self.tp_rank == 0
@@ -520,35 +517,6 @@ def calculate_reward(self, rollout: Dict[str, Any]) -> Dict[str, Any]:
520517
rollout["ans_acc"] = ans_acc.view((-1, 1))
521518
return rollout
522519

523-
def prompt_level_filtering(self, rollout_group: Dict[str, Any]) -> Dict[str, Any]:
524-
"""
525-
rollout_group: Dict[str, Any]
526-
a group of samples generated by the model from the same prompt
527-
contain the following keys:
528-
"input_ids": torch.Tensor, [num_of_generation, prompt_length + response_length]
529-
"attention_mask": torch.Tensor, [num_of_generation, prompt_length + response_length]
530-
"action_mask": torch.Tensor, [num_of_generation, response_length]
531-
"action_log_probs": torch.Tensor, [num_of_generation, response_length]
532-
"response_idx": int, torch.Tensor, [num_of_generation, 2]
533-
"gt_answer": torch.Tensor, [num_of_generation, 128]
534-
"temperature": torch.Tensor, [] (scalar)
535-
"reward": torch.Tensor, [num_of_generation]
536-
"format_acc": torch.Tensor, [num_of_generation]
537-
"ans_acc": torch.Tensor, [num_of_generation]
538-
"""
539-
self.total_sample_count += rollout_group["input_ids"].size(0)
540-
if self.filter_range is not None:
541-
# filter prompt whoes accuracy is too high or too low (out of range)
542-
group_ans_acc = torch.mean(rollout_group["ans_acc"])
543-
if group_ans_acc < self.filter_range[0] or group_ans_acc > self.filter_range[1]:
544-
# filter out the prompt
545-
return None
546-
else:
547-
return rollout_group
548-
else:
549-
# no filter
550-
return rollout_group
551-
552520
def state_dict(self):
553521
self.policy_model._force_wait_all_gather()
554522
model = self.policy_model.unwrap()

applications/ColossalChat/coati/distributed/producer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,11 +248,10 @@ def loop(self) -> None:
248248
self.eval_mode = False
249249
self.latest_eval_step = self.consumer_global_step
250250
outputs = self.rollout(**batch)
251-
252-
print(f"[P{self.producer_idx}] Send data {[(k, v.shape) for k, v in outputs.items()]}")
253251
outputs["temperature"] = torch.tensor(
254252
[self.model.generate_config["temperature"]] * outputs["input_ids"].size(0)
255253
).to(outputs["input_ids"].device)
254+
print(f"[P{self.producer_idx}] Send data {[(k, v.shape) for k, v in outputs.items()]}")
256255
outputs = pre_send(outputs)
257256
ray_broadcast_tensor_dict(
258257
outputs, src=0, device=self.device, group_name=f"sync_data_{self.producer_idx}"

0 commit comments

Comments
 (0)