Skip to content

Commit 28688e7

Browse files
authored
[reward] fix: preserve input non_tensor_batch in AgentLoopManager when reward_loop_worker_handles is None (verl-project#5195)
### What does this PR do? ### Problem When the codebase is updated to [2cd9283](verl-project@2cd9283) (migration to the new asynchronous reward manager), using **colocate RM** with async rollout (`AgentLoopManager`) causes validation to fail with: KeyError: 'data_source' - **Where:** `verl/experimental/reward_loop/reward_manager/naive.py`, line 42, in `run_single` — it accesses `data_item.non_tensor_batch["data_source"]`. - **Call path:** `_validate` → `_compute_reward_colocate(test_output_gen_batch_padded)` → `reward_loop_manager.compute_rm_score(batch)` → `RewardLoopWorker.compute_score_batch` → `compute_score` → `run_single`. - **Cause:** When `reward_loop_worker_handles is None` (e.g. colocate RM), `AgentLoopManager.generate_sequences` returns a `DataProto` whose `non_tensor_batch` is built only from agent outputs (`__num_turns__`, `multi_modal_inputs`, `raw_prompt`). Input metadata such as `data_source` is never forwarded, so the batch passed to the reward manager is missing `data_source` and the naive reward manager raises `KeyError: 'data_source'`. <img width="1684" height="785" alt="WeChatWorkScreenshot_0bc71e8a-a6a8-4334-b930-5a5d0bb149a2" src="https://github.com/user-attachments/assets/ed492d44-a198-4508-b094-11426207fdf2" /> ## Solution - Pass the input batch’s `non_tensor_batch` into `_postprocess` as `**kwargs`. - When `reward_loop_worker_handles is None`, merge these `kwargs` into the output `non_tensor_batch` so `data_source` and other input keys are preserved. - Colocate RM / validation then receives a batch that includes `data_source`, and the `KeyError` is fixed.
1 parent d8561c2 commit 28688e7

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

verl/experimental/agent_loop/agent_loop.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ async def generate_sequences(self, batch: DataProto) -> DataProto:
469469
)
470470
outputs = await asyncio.gather(*tasks)
471471

472-
output = self._postprocess(outputs)
472+
output = self._postprocess(outputs, input_non_tensor_batch=batch.non_tensor_batch)
473473

474474
return output
475475

@@ -717,7 +717,11 @@ async def _compute_score(self, output, prompts, responses, attention_mask, input
717717
output.reward_score = result["reward_score"]
718718
output.extra_fields["reward_extra_info"] = result["reward_extra_info"]
719719

720-
def _postprocess(self, inputs: list[_InternalAgentLoopOutput]) -> DataProto:
720+
def _postprocess(
721+
self,
722+
inputs: list[_InternalAgentLoopOutput],
723+
input_non_tensor_batch: dict | None = None,
724+
) -> DataProto:
721725
"""Process the padded outputs from _run_agent_loop and combine them into a batch."""
722726
# Convert lists back to tensors and stack them to create a batch.
723727
prompt_ids = torch.cat([input.prompt_ids for input in inputs], dim=0)
@@ -757,6 +761,8 @@ def _postprocess(self, inputs: list[_InternalAgentLoopOutput]) -> DataProto:
757761
non_tensor_batch = {
758762
"__num_turns__": np.array([input.num_turns for input in inputs], dtype=np.int32),
759763
}
764+
if self.reward_loop_worker_handles is None and input_non_tensor_batch:
765+
non_tensor_batch.update(input_non_tensor_batch)
760766

761767
# add reward_extra_info to non_tensor_batch
762768
reward_extra_infos = [input.extra_fields.get("reward_extra_info", {}) for input in inputs]

0 commit comments

Comments
 (0)