|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import json |
| 4 | +from copy import deepcopy |
| 5 | +from typing import Any |
| 6 | + |
3 | 7 | from ..handoffs import HandoffInputData |
4 | 8 | from ..items import ( |
5 | 9 | HandoffCallItem, |
6 | 10 | HandoffOutputItem, |
| 11 | + ItemHelpers, |
7 | 12 | ReasoningItem, |
8 | 13 | RunItem, |
9 | 14 | ToolCallItem, |
@@ -34,6 +39,102 @@ def remove_all_tools(handoff_input_data: HandoffInputData) -> HandoffInputData: |
34 | 39 | ) |
35 | 40 |
|
36 | 41 |
|
| 42 | +def nest_handoff_history(handoff_input_data: HandoffInputData) -> HandoffInputData: |
| 43 | + """Summarizes the previous transcript into a developer message for the next agent.""" |
| 44 | + |
| 45 | + normalized_history = _normalize_input_history(handoff_input_data.input_history) |
| 46 | + pre_items_as_inputs = [ |
| 47 | + _run_item_to_plain_input(item) for item in handoff_input_data.pre_handoff_items |
| 48 | + ] |
| 49 | + new_items_as_inputs = [_run_item_to_plain_input(item) for item in handoff_input_data.new_items] |
| 50 | + transcript = normalized_history + pre_items_as_inputs + new_items_as_inputs |
| 51 | + |
| 52 | + developer_message = _build_developer_message(transcript) |
| 53 | + latest_user = _find_latest_user_turn(transcript) |
| 54 | + history_items: list[TResponseInputItem] = [developer_message] |
| 55 | + if latest_user is not None: |
| 56 | + history_items.append(latest_user) |
| 57 | + |
| 58 | + filtered_pre_items = tuple( |
| 59 | + item |
| 60 | + for item in handoff_input_data.pre_handoff_items |
| 61 | + if _get_run_item_role(item) != "assistant" |
| 62 | + ) |
| 63 | + |
| 64 | + return handoff_input_data.clone( |
| 65 | + input_history=tuple(history_items), |
| 66 | + pre_handoff_items=filtered_pre_items, |
| 67 | + ) |
| 68 | + |
| 69 | + |
| 70 | +def _normalize_input_history( |
| 71 | + input_history: str | tuple[TResponseInputItem, ...], |
| 72 | +) -> list[TResponseInputItem]: |
| 73 | + if isinstance(input_history, str): |
| 74 | + return ItemHelpers.input_to_new_input_list(input_history) |
| 75 | + return [deepcopy(item) for item in input_history] |
| 76 | + |
| 77 | + |
| 78 | +def _run_item_to_plain_input(run_item: RunItem) -> TResponseInputItem: |
| 79 | + return deepcopy(run_item.to_input_item()) |
| 80 | + |
| 81 | + |
| 82 | +def _build_developer_message(transcript: list[TResponseInputItem]) -> TResponseInputItem: |
| 83 | + if transcript: |
| 84 | + summary_lines = [ |
| 85 | + f"{idx + 1}. {_format_transcript_item(item)}" for idx, item in enumerate(transcript) |
| 86 | + ] |
| 87 | + else: |
| 88 | + summary_lines = ["(no previous turns recorded)"] |
| 89 | + |
| 90 | + content = "Previous conversation before this handoff:\n" + "\n".join(summary_lines) |
| 91 | + return {"role": "developer", "content": content} |
| 92 | + |
| 93 | + |
| 94 | +def _format_transcript_item(item: TResponseInputItem) -> str: |
| 95 | + role = item.get("role") |
| 96 | + if isinstance(role, str): |
| 97 | + prefix = role |
| 98 | + name = item.get("name") |
| 99 | + if isinstance(name, str) and name: |
| 100 | + prefix = f"{prefix} ({name})" |
| 101 | + content_str = _stringify_content(item.get("content")) |
| 102 | + return f"{prefix}: {content_str}" if content_str else prefix |
| 103 | + |
| 104 | + item_type = item.get("type", "item") |
| 105 | + rest = {k: v for k, v in item.items() if k != "type"} |
| 106 | + try: |
| 107 | + serialized = json.dumps(rest, ensure_ascii=False, default=str) |
| 108 | + except TypeError: |
| 109 | + serialized = str(rest) |
| 110 | + return f"{item_type}: {serialized}" if serialized else str(item_type) |
| 111 | + |
| 112 | + |
| 113 | +def _stringify_content(content: Any) -> str: |
| 114 | + if content is None: |
| 115 | + return "" |
| 116 | + if isinstance(content, str): |
| 117 | + return content |
| 118 | + try: |
| 119 | + return json.dumps(content, ensure_ascii=False, default=str) |
| 120 | + except TypeError: |
| 121 | + return str(content) |
| 122 | + |
| 123 | + |
| 124 | +def _find_latest_user_turn( |
| 125 | + transcript: list[TResponseInputItem], |
| 126 | +) -> TResponseInputItem | None: |
| 127 | + for item in reversed(transcript): |
| 128 | + if item.get("role") == "user": |
| 129 | + return deepcopy(item) |
| 130 | + return None |
| 131 | + |
| 132 | + |
| 133 | +def _get_run_item_role(run_item: RunItem) -> str | None: |
| 134 | + role_candidate = run_item.to_input_item().get("role") |
| 135 | + return role_candidate if isinstance(role_candidate, str) else None |
| 136 | + |
| 137 | + |
37 | 138 | def _remove_tools_from_items(items: tuple[RunItem, ...]) -> tuple[RunItem, ...]: |
38 | 139 | filtered_items = [] |
39 | 140 | for item in items: |
|
0 commit comments