Skip to content

Allow to pass both session and input list #1298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import copy
import inspect
from dataclasses import dataclass, field
from typing import Any, Generic, cast
from typing import Any, Generic, Literal, cast

from openai.types.responses import ResponseCompletedEvent
from openai.types.responses.response_prompt_param import (
Expand Down Expand Up @@ -139,6 +139,11 @@ class RunConfig:
An optional dictionary of additional metadata to include with the trace.
"""

session_input_handling: Literal["replace", "append"] | None = None
"""If a custom input list is given together with the Session, it will
be appended to the session messages or it will replace them.
"""


class RunOptions(TypedDict, Generic[TContext]):
"""Arguments for ``AgentRunner`` methods."""
Expand Down Expand Up @@ -343,7 +348,9 @@ async def run(
run_config = RunConfig()

# Prepare input with session if enabled
prepared_input = await self._prepare_input_with_session(input, session)
prepared_input = await self._prepare_input_with_session(
input, session, run_config.session_input_handling
)

tool_use_tracker = AgentToolUseTracker()

Expand Down Expand Up @@ -468,7 +475,9 @@ async def run(
)

# Save the conversation to session if enabled
await self._save_result_to_session(session, input, result)
await self._save_result_to_session(
session, input, result, run_config.session_input_handling
)

return result
elif isinstance(turn_result.next_step, NextStepHandoff):
Expand Down Expand Up @@ -662,7 +671,9 @@ async def _start_streaming(

try:
# Prepare input with session if enabled
prepared_input = await AgentRunner._prepare_input_with_session(starting_input, session)
prepared_input = await AgentRunner._prepare_input_with_session(
starting_input, session, run_config.session_input_handling
)

# Update the streamed result with the prepared input
streamed_result.input = prepared_input
Expand Down Expand Up @@ -781,7 +792,7 @@ async def _start_streaming(
context_wrapper=context_wrapper,
)
await AgentRunner._save_result_to_session(
session, starting_input, temp_result
session, starting_input, temp_result, run_config.session_input_handling
)

streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
Expand Down Expand Up @@ -1191,18 +1202,18 @@ async def _prepare_input_with_session(
cls,
input: str | list[TResponseInputItem],
session: Session | None,
session_input_handling: Literal["replace", "append"] | None,
) -> str | list[TResponseInputItem]:
"""Prepare input by combining it with session history if enabled."""
if session is None:
return input

# Validate that we don't have both a session and a list input, as this creates
# ambiguity about whether the list should append to or replace existing session history
if isinstance(input, list):
# If the user doesn't explicitly specify a mode, raise an error
if isinstance(input, list) and not session_input_handling:
raise UserError(
"Cannot provide both a session and a list of input items. "
"When using session memory, provide only a string input to append to the "
"conversation, or use session=None and provide a list to manually manage "
"You must specify the `session_input_handling` in the `RunConfig`. "
"Otherwise, when using session memory, provide only a string input to append to "
"the conversation, or use session=None and provide a list to manually manage "
"conversation history."
)

Expand All @@ -1212,8 +1223,17 @@ async def _prepare_input_with_session(
# Convert input to list format
new_input_list = ItemHelpers.input_to_new_input_list(input)

# Combine history with new input
combined_input = history + new_input_list
if session_input_handling == "append" or session_input_handling is None:
Copy link
Member

@seratch seratch Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, allowing to pass callback function here to combine the item list to save sounds more flexible. @rm-openai what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this would offer much more flexibility.
However, would you still preserve the option for the users to specify "replace" or "append" in order to apply some predefined processing?

In that scenario, the RunConfig attribute could accept a Callable, a str or None.

Or would you prefer to have a more complex implementation like the one done for MCP filters (ToolFilterCallable, etc)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sessions are supposed to be used for maintaining conversation history across multiple agent runs. So, I don't think "replace" is a common use case. If "replace" works well, do you really need a session for the case?

If a developer would like to make some adjustments to stored history based on some conditions, that could be a valid use case (I still think it's an edge case though). That's why I thought enabling callback functions here would make more sense.

Let me know if I am missing anything!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. And what should happen if the callback function is left unspecified (None) and the input is a list? We may raise an error, as in the current implementation.

Moreover, does the callback function only take the history and the list of new items as input, or should I allow additional parameters?

# Append new input to history
combined_input = history + new_input_list
elif session_input_handling == "replace":
# Replace history with new input
combined_input = new_input_list
else:
raise UserError(
"The specified `session_input_handling` is not available. "
"Choose between `append`, `replace` or `None`."
)

return combined_input

Expand All @@ -1223,11 +1243,16 @@ async def _save_result_to_session(
session: Session | None,
original_input: str | list[TResponseInputItem],
result: RunResult,
saving_mode: Literal["replace", "append"] | None = None,
) -> None:
"""Save the conversation turn to session."""
if session is None:
return

# Remove old history
if saving_mode == "replace":
await session.clear_session()

# Convert original input to list format if needed
input_list = ItemHelpers.input_to_new_input_list(original_input)

Expand Down
123 changes: 121 additions & 2 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from agents import Agent, Runner, SQLiteSession, TResponseInputItem
from agents import Agent, RunConfig, Runner, SQLiteSession, TResponseInputItem
from agents.exceptions import UserError

from .fake_model import FakeModel
Expand Down Expand Up @@ -394,7 +394,126 @@ async def test_session_memory_rejects_both_session_and_list_input(runner_method)
await run_agent_async(runner_method, agent, list_input, session=session)

# Verify the error message explains the issue
assert "Cannot provide both a session and a list of input items" in str(exc_info.value)
assert "You must specify the `session_input_handling` in" in str(exc_info.value)
assert "manually manage conversation history" in str(exc_info.value)

session.close()


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_memory_append_list(runner_method):
"""Test if the user passes a list of items and want to append them."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_memory.db"

model = FakeModel()
agent = Agent(name="test", model=model)

# Session
session_id = "session_1"
session = SQLiteSession(session_id, db_path)

model.set_next_output([get_text_message("I like cats")])
_ = await run_agent_async(runner_method, agent, "I like cats", session=session)

append_input = [
{"role": "user", "content": "Some random user text"},
{"role": "assistant", "content": "You're right"},
{"role": "user", "content": "What did I say I like?"},
]
second_model_response = {"role": "assistant", "content": "Yes, you mentioned cats"}
model.set_next_output([get_text_message(second_model_response.get("content", ""))])

_ = await run_agent_async(
runner_method,
agent,
append_input,
session=session,
run_config=RunConfig(session_input_handling="append"),
)

session_items = await session.get_items()

# Check the items has been appended
assert len(session_items) == 6

# Check the items are the last 4 elements
append_input.append(second_model_response)
for sess_item, orig_item in zip(session_items[-4:], append_input):
assert sess_item.get("role") == orig_item.get("role")

sess_content = sess_item.get("content")
# Narrow to list or str for mypy
assert isinstance(sess_content, (list, str))

if isinstance(sess_content, list):
# now mypy knows `content: list[Any]`
assert isinstance(sess_content[0], dict) and "text" in sess_content[0]
val_sess = sess_content[0]["text"]
else:
# here content is str
val_sess = sess_content

assert val_sess == orig_item["content"]

session.close()


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_memory_replace_list(runner_method):
"""Test if the user passes a list of items and want to replace the history."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_memory.db"

model = FakeModel()
agent = Agent(name="test", model=model)

# Session
session_id = "session_1"
session = SQLiteSession(session_id, db_path)

model.set_next_output([get_text_message("I like cats")])
_ = await run_agent_async(runner_method, agent, "I like cats", session=session)

replace_input = [
{"role": "user", "content": "Some random user text"},
{"role": "assistant", "content": "You're right"},
{"role": "user", "content": "What did I say I like?"},
]
second_model_response = {"role": "assistant", "content": "Yes, you mentioned cats"}
model.set_next_output([get_text_message(second_model_response.get("content", ""))])

_ = await run_agent_async(
runner_method,
agent,
replace_input,
session=session,
run_config=RunConfig(session_input_handling="replace"),
)

session_items = await session.get_items()

# Check the new items replaced the history
assert len(session_items) == 4

# Check the items are the last 4 elements
replace_input.append(second_model_response)
for sess_item, orig_item in zip(session_items, replace_input):
assert sess_item.get("role") == orig_item.get("role")
sess_content = sess_item.get("content")
# Narrow to list or str for mypy
assert isinstance(sess_content, (list, str))

if isinstance(sess_content, list):
# now mypy knows `content: list[Any]`
assert isinstance(sess_content[0], dict) and "text" in sess_content[0]
val_sess = sess_content[0]["text"]
else:
# here content is str
val_sess = sess_content

assert val_sess == orig_item["content"]

session.close()