Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(
self._compaction_candidate_items: list[TResponseInputItem] | None = None
self._session_items: list[TResponseInputItem] | None = None
self._response_id: str | None = None
self._deferred_response_id: str | None = None

@property
def client(self) -> AsyncOpenAI:
Expand Down Expand Up @@ -153,6 +154,7 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
logger.debug(f"skip: decision hook declined compaction for {self._response_id}")
return

self._deferred_response_id = None
logger.debug(f"compact: start for {self._response_id} using {self.model}")

compacted = await self.client.responses.compact(
Expand Down Expand Up @@ -187,6 +189,26 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)

async def _defer_compaction(self, response_id: str) -> None:
if self._deferred_response_id is not None:
return
compaction_candidate_items, session_items = await self._ensure_compaction_candidates()
should_compact = self.should_trigger_compaction(
{
"response_id": response_id,
"compaction_candidate_items": compaction_candidate_items,
"session_items": session_items,
}
)
if should_compact:
self._deferred_response_id = response_id

def _get_deferred_compaction_response_id(self) -> str | None:
return self._deferred_response_id

def _clear_deferred_compaction(self) -> None:
self._deferred_response_id = None

async def add_items(self, items: list[TResponseInputItem]) -> None:
await self.underlying_session.add_items(items)
if self._compaction_candidate_items is not None:
Expand All @@ -207,6 +229,7 @@ async def clear_session(self) -> None:
await self.underlying_session.clear_session()
self._compaction_candidate_items = []
self._session_items = []
self._deferred_response_id = None

async def _ensure_compaction_candidates(
self,
Expand Down
29 changes: 28 additions & 1 deletion src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
from .handoffs import Handoff, HandoffHistoryMapper, HandoffInputFilter, handoff
from .items import (
HandoffCallItem,
HandoffOutputItem,
ItemHelpers,
ModelResponse,
ReasoningItem,
RunItem,
ToolCallItem,
ToolCallItemTypes,
ToolCallOutputItem,
TResponseInputItem,
)
from .lifecycle import AgentHooksBase, RunHooks, RunHooksBase
Expand Down Expand Up @@ -2094,7 +2096,32 @@ async def _save_result_to_session(

# Run compaction if session supports it and we have a response_id
if response_id and is_openai_responses_compaction_aware_session(session):
await session.run_compaction({"response_id": response_id})
has_local_tool_outputs = any(
isinstance(item, (ToolCallOutputItem, HandoffOutputItem)) for item in new_items
)
if has_local_tool_outputs:
defer_compaction = getattr(session, "_defer_compaction", None)
if callable(defer_compaction):
result = defer_compaction(response_id)
if inspect.isawaitable(result):
await result
logger.debug(
"skip: deferring compaction for response %s due to local tool outputs",
response_id,
)
return
deferred_response_id = None
get_deferred = getattr(session, "_get_deferred_compaction_response_id", None)
if callable(get_deferred):
deferred_response_id = get_deferred()
force_compaction = deferred_response_id is not None
if force_compaction:
logger.debug(
"compact: forcing for response %s after deferred %s",
response_id,
deferred_response_id,
)
await session.run_compaction({"response_id": response_id, "force": force_compaction})

@staticmethod
async def _input_guardrail_tripwire_triggered_for_stream(
Expand Down
114 changes: 113 additions & 1 deletion tests/memory/test_openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
select_compaction_candidate_items,
)
from tests.fake_model import FakeModel
from tests.test_responses import get_text_message
from tests.test_responses import get_function_tool, get_function_tool_call, get_text_message
from tests.utils.simple_session import SimpleListSession


Expand Down Expand Up @@ -289,6 +289,118 @@ async def test_compaction_runs_during_runner_flow(self) -> None:
items = await session.get_items()
assert any(isinstance(item, dict) and item.get("type") == "compaction" for item in items)

@pytest.mark.asyncio
async def test_compaction_skips_when_tool_outputs_present(self) -> None:
underlying = SimpleListSession()
mock_client = MagicMock()
mock_client.responses.compact = AsyncMock()

session = OpenAIResponsesCompactionSession(
session_id="demo",
underlying_session=underlying,
client=mock_client,
should_trigger_compaction=lambda ctx: True,
)

tool = get_function_tool(name="do_thing", return_value="done")
model = FakeModel(initial_output=[get_function_tool_call("do_thing")])
agent = Agent(
name="assistant",
model=model,
tools=[tool],
tool_use_behavior="stop_on_first_tool",
)

await Runner.run(agent, "hello", session=session)

mock_client.responses.compact.assert_not_called()

@pytest.mark.asyncio
async def test_compaction_runs_after_deferred_tool_outputs_when_due(self) -> None:
underlying = SimpleListSession()
compacted = SimpleNamespace(
output=[{"type": "compaction", "summary": "compacted"}],
)
mock_client = MagicMock()
mock_client.responses.compact = AsyncMock(return_value=compacted)

def should_trigger_compaction(context: dict[str, Any]) -> bool:
return any(
isinstance(item, dict) and item.get("type") == "function_call_output"
for item in context["session_items"]
)

session = OpenAIResponsesCompactionSession(
session_id="demo",
underlying_session=underlying,
client=mock_client,
should_trigger_compaction=should_trigger_compaction,
)

tool = get_function_tool(name="do_thing", return_value="done")
model = FakeModel()
model.add_multiple_turn_outputs(
[
[get_function_tool_call("do_thing")],
[get_text_message("ok")],
]
)
agent = Agent(
name="assistant",
model=model,
tools=[tool],
tool_use_behavior="stop_on_first_tool",
)

await Runner.run(agent, "hello", session=session)
await Runner.run(agent, "followup", session=session)

mock_client.responses.compact.assert_awaited_once()

@pytest.mark.asyncio
async def test_deferred_compaction_persists_across_tool_turns(self) -> None:
underlying = SimpleListSession()
compacted = SimpleNamespace(
output=[{"type": "compaction", "summary": "compacted"}],
)
mock_client = MagicMock()
mock_client.responses.compact = AsyncMock(return_value=compacted)

should_compact_calls = {"count": 0}

def should_trigger_compaction(context: dict[str, Any]) -> bool:
should_compact_calls["count"] += 1
return should_compact_calls["count"] == 1

session = OpenAIResponsesCompactionSession(
session_id="demo",
underlying_session=underlying,
client=mock_client,
should_trigger_compaction=should_trigger_compaction,
)

tool = get_function_tool(name="do_thing", return_value="done")
model = FakeModel()
model.add_multiple_turn_outputs(
[
[get_function_tool_call("do_thing")],
[get_function_tool_call("do_thing")],
[get_text_message("ok")],
]
)
agent = Agent(
name="assistant",
model=model,
tools=[tool],
tool_use_behavior="stop_on_first_tool",
)

await Runner.run(agent, "hello", session=session)
await Runner.run(agent, "again", session=session)
await Runner.run(agent, "final", session=session)

mock_client.responses.compact.assert_awaited_once()


class TestTypeGuard:
def test_is_compaction_aware_session_true(self) -> None:
Expand Down