Skip to content

Commit 3e415c0

Browse files
committed
implement structured storage with additional tables for messages and tool calls
1 parent 043b3d6 commit 3e415c0

File tree

4 files changed

+587
-8
lines changed

4 files changed

+587
-8
lines changed

docs/sessions.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,38 @@ result = await Runner.run(
141141
)
142142
```
143143

144+
### Structured storage
145+
146+
By default, SQLiteSession stores all conversation events as JSON blobs in a single table. You can enable structured storage to create additional tables for messages and tool calls:
147+
148+
```python
149+
from agents import SQLiteSession
150+
151+
# Enable structured storage
152+
session = SQLiteSession(
153+
"user_123",
154+
"conversations.db",
155+
structured=True
156+
)
157+
158+
# This creates additional tables:
159+
# - agent_conversation_messages: stores user, assistant, system messages
160+
# - agent_tool_calls: stores tool call requests and outputs
161+
```
162+
163+
With structured storage enabled, you can query conversations using standard SQL:
164+
165+
```sql
166+
-- Get all user messages in a session
167+
SELECT content FROM agent_conversation_messages
168+
WHERE session_id = 'user_123' AND role = 'user';
169+
170+
-- Get all tool calls and their results
171+
SELECT tool_name, arguments, output, status
172+
FROM agent_tool_calls
173+
WHERE session_id = 'user_123';
174+
```
175+
144176
### Multiple sessions
145177

146178
```python
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""A script to test and demonstrate the structured session storage feature."""
2+
3+
import asyncio
4+
import random
5+
import sqlite3
6+
7+
from agents import Agent, Runner, SQLiteSession, function_tool
8+
9+
10+
async def main():
11+
# Create a tool
12+
@function_tool
13+
def get_random_number(max_val: int) -> int:
14+
"""Get a random number between 0 and max_val."""
15+
return random.randint(0, max_val)
16+
17+
# Create an agent
18+
agent = Agent(
19+
name="Assistant",
20+
instructions="Reply very concisely. When using tools, explain what you're doing.",
21+
tools=[get_random_number],
22+
)
23+
24+
# Create a session with structured storage enabled
25+
db_path = "structured_conversation_demo.db"
26+
session = SQLiteSession("demo_session", db_path, structured=True)
27+
28+
print("=== Structured Session Storage Demo ===")
29+
print("This demo shows structured storage that makes conversations easy to query.\n")
30+
31+
# First turn
32+
print("First turn:")
33+
print("User: Pick a random number between 0 and 100")
34+
result = await Runner.run(
35+
agent,
36+
"Pick a random number between 0 and 100",
37+
session=session
38+
)
39+
print(f"Assistant: {result.final_output}")
40+
print()
41+
42+
# Second turn - the agent will remember the previous conversation
43+
print("Second turn:")
44+
print("User: What number did you pick for me?")
45+
result = await Runner.run(
46+
agent,
47+
"What number did you pick for me?",
48+
session=session
49+
)
50+
print(f"Assistant: {result.final_output}")
51+
print()
52+
53+
# Third turn - another tool call
54+
print("Third turn:")
55+
print("User: Now pick a number between 0 and 50")
56+
result = await Runner.run(
57+
agent,
58+
"Now pick a number between 0 and 50",
59+
session=session
60+
)
61+
print(f"Assistant: {result.final_output}")
62+
print()
63+
64+
print("=== Conversation Complete ===")
65+
print(f"Data stored in: {db_path}")
66+
print()
67+
68+
# Now demonstrate the structured storage benefits
69+
print("=== Structured Storage Analysis ===")
70+
print("With structured storage, you can easily query the conversation:")
71+
print()
72+
73+
conn = sqlite3.connect(db_path)
74+
75+
# Show all messages
76+
print("1. All conversation messages:")
77+
cursor = conn.execute("""
78+
SELECT role, content FROM agent_conversation_messages
79+
WHERE session_id = 'demo_session'
80+
ORDER BY created_at
81+
""")
82+
for role, content in cursor.fetchall():
83+
content_preview = content[:60] + "..." if len(content) > 60 else content
84+
print(f" {role}: {content_preview}")
85+
print()
86+
87+
# Show all tool calls
88+
print("2. All tool calls and results:")
89+
cursor = conn.execute("""
90+
SELECT tool_name, arguments, output, status
91+
FROM agent_tool_calls
92+
WHERE session_id = 'demo_session'
93+
ORDER BY created_at
94+
""")
95+
for tool_name, arguments, output, status in cursor.fetchall():
96+
print(f" Tool: {tool_name}")
97+
print(f" Args: {arguments}")
98+
print(f" Result: {output}")
99+
print(f" Status: {status}")
100+
print()
101+
102+
# Show message count by role
103+
print("3. Message count by role:")
104+
cursor = conn.execute("""
105+
SELECT role, COUNT(*) as count
106+
FROM agent_conversation_messages
107+
WHERE session_id = 'demo_session'
108+
GROUP BY role
109+
""")
110+
for role, count in cursor.fetchall():
111+
print(f" {role}: {count} messages")
112+
print()
113+
114+
conn.close()
115+
session.close()
116+
117+
print("=== Query Examples ===")
118+
print("You can now run SQL queries like:")
119+
print("• SELECT * FROM agent_conversation_messages WHERE role = 'user';")
120+
print("• SELECT tool_name, COUNT(*) FROM agent_tool_calls GROUP BY tool_name;")
121+
print("• SELECT * FROM agent_tool_calls WHERE status = 'completed';")
122+
print()
123+
print("This makes conversation analysis, debugging, and building editing")
124+
print("tools much easier than parsing JSON blobs!")
125+
126+
127+
if __name__ == "__main__":
128+
asyncio.run(main())

src/agents/memory/session.py

Lines changed: 154 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ def __init__(
118118
db_path: str | Path = ":memory:",
119119
sessions_table: str = "agent_sessions",
120120
messages_table: str = "agent_messages",
121+
*,
122+
structured: bool = False,
123+
conversation_table: str = "agent_conversation_messages",
124+
tool_calls_table: str = "agent_tool_calls",
121125
):
122126
"""Initialize the SQLite session.
123127
@@ -127,11 +131,20 @@ def __init__(
127131
sessions_table: Name of the table to store session metadata. Defaults to
128132
'agent_sessions'
129133
messages_table: Name of the table to store message data. Defaults to 'agent_messages'
134+
structured: If True, enables structured storage mode, creating
135+
additional tables for messages and tool calls. Defaults to False.
136+
conversation_table: Name for the structured conversation messages table.
137+
Defaults to 'agent_conversation_messages'.
138+
tool_calls_table: Name for the structured tool calls table.
139+
Defaults to 'agent_tool_calls'.
130140
"""
131141
self.session_id = session_id
132142
self.db_path = db_path
133143
self.sessions_table = sessions_table
134144
self.messages_table = messages_table
145+
self.structured = structured
146+
self.conversation_table = conversation_table
147+
self.tool_calls_table = tool_calls_table
135148
self._local = threading.local()
136149
self._lock = threading.Lock()
137150

@@ -141,11 +154,13 @@ def __init__(
141154
if self._is_memory_db:
142155
self._shared_connection = sqlite3.connect(":memory:", check_same_thread=False)
143156
self._shared_connection.execute("PRAGMA journal_mode=WAL")
157+
self._shared_connection.execute("PRAGMA foreign_keys=ON")
144158
self._init_db_for_connection(self._shared_connection)
145159
else:
146160
# For file databases, initialize the schema once since it persists
147161
init_conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
148162
init_conn.execute("PRAGMA journal_mode=WAL")
163+
init_conn.execute("PRAGMA foreign_keys=ON")
149164
self._init_db_for_connection(init_conn)
150165
init_conn.close()
151166

@@ -162,6 +177,7 @@ def _get_connection(self) -> sqlite3.Connection:
162177
check_same_thread=False,
163178
)
164179
self._local.connection.execute("PRAGMA journal_mode=WAL")
180+
self._local.connection.execute("PRAGMA foreign_keys=ON")
165181
assert isinstance(self._local.connection, sqlite3.Connection), (
166182
f"Expected sqlite3.Connection, got {type(self._local.connection)}"
167183
)
@@ -201,6 +217,63 @@ def _init_db_for_connection(self, conn: sqlite3.Connection) -> None:
201217

202218
conn.commit()
203219

220+
# Create additional structured tables if enabled
221+
if getattr(self, "structured", False):
222+
# Conversation messages table
223+
conn.execute(
224+
f"""
225+
CREATE TABLE IF NOT EXISTS {self.conversation_table} (
226+
id INTEGER PRIMARY KEY AUTOINCREMENT,
227+
session_id TEXT NOT NULL,
228+
raw_event_id INTEGER NOT NULL,
229+
role TEXT,
230+
content TEXT,
231+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
232+
FOREIGN KEY (session_id) REFERENCES {self.sessions_table} (session_id)
233+
ON DELETE CASCADE,
234+
FOREIGN KEY (raw_event_id) REFERENCES {self.messages_table} (id)
235+
ON DELETE CASCADE
236+
)
237+
"""
238+
)
239+
240+
conn.execute(
241+
f"""
242+
CREATE INDEX IF NOT EXISTS idx_{self.conversation_table}_session_id
243+
ON {self.conversation_table} (session_id, created_at)
244+
"""
245+
)
246+
247+
# Tool calls table
248+
conn.execute(
249+
f"""
250+
CREATE TABLE IF NOT EXISTS {self.tool_calls_table} (
251+
id INTEGER PRIMARY KEY AUTOINCREMENT,
252+
session_id TEXT NOT NULL,
253+
raw_event_id INTEGER NOT NULL,
254+
call_id TEXT,
255+
tool_name TEXT,
256+
arguments JSON,
257+
output JSON,
258+
status TEXT,
259+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
260+
FOREIGN KEY (session_id) REFERENCES {self.sessions_table} (session_id)
261+
ON DELETE CASCADE,
262+
FOREIGN KEY (raw_event_id) REFERENCES {self.messages_table} (id)
263+
ON DELETE CASCADE
264+
)
265+
"""
266+
)
267+
268+
conn.execute(
269+
f"""
270+
CREATE INDEX IF NOT EXISTS idx_{self.tool_calls_table}_session_id
271+
ON {self.tool_calls_table} (session_id, created_at)
272+
"""
273+
)
274+
275+
conn.commit()
276+
204277
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
205278
"""Retrieve the conversation history for this session.
206279
@@ -278,13 +351,86 @@ def _add_items_sync():
278351
)
279352

280353
# Add items
281-
message_data = [(self.session_id, json.dumps(item)) for item in items]
282-
conn.executemany(
283-
f"""
284-
INSERT INTO {self.messages_table} (session_id, message_data) VALUES (?, ?)
285-
""",
286-
message_data,
287-
)
354+
if not self.structured:
355+
# Flat storage: bulk insert for performance
356+
message_data = [(self.session_id, json.dumps(item)) for item in items]
357+
conn.executemany(
358+
f"""
359+
INSERT INTO {self.messages_table} (session_id, message_data) VALUES (?, ?)
360+
""",
361+
message_data,
362+
)
363+
else:
364+
# Structured storage: insert each item individually so we can capture rowid
365+
for item in items:
366+
raw_json = json.dumps(item)
367+
cursor = conn.execute(
368+
f"""
369+
INSERT INTO {self.messages_table} (session_id, message_data)
370+
VALUES (?, ?)
371+
RETURNING id
372+
""",
373+
(self.session_id, raw_json),
374+
)
375+
raw_event_id = cursor.fetchone()[0]
376+
377+
# Handle structured inserts
378+
if "role" in item:
379+
role = item.get("role")
380+
content_val = item.get("content")
381+
try:
382+
content_str = (
383+
json.dumps(content_val)
384+
if content_val is not None
385+
else None
386+
)
387+
except TypeError:
388+
content_str = str(content_val)
389+
390+
conn.execute(
391+
f"""
392+
INSERT INTO {self.conversation_table}
393+
(session_id, raw_event_id, role, content)
394+
VALUES (?, ?, ?, ?)
395+
""",
396+
(self.session_id, raw_event_id, role, content_str),
397+
)
398+
399+
event_type = item.get("type")
400+
if event_type == "function_call":
401+
call_id = item.get("call_id")
402+
tool_name = item.get("name")
403+
arguments_val = item.get("arguments")
404+
conn.execute(
405+
f"""
406+
INSERT INTO {self.tool_calls_table}
407+
(session_id, raw_event_id, call_id, tool_name, arguments, status)
408+
VALUES (?, ?, ?, ?, ?, ?)
409+
""",
410+
(
411+
self.session_id,
412+
raw_event_id,
413+
call_id,
414+
tool_name,
415+
arguments_val,
416+
item.get("status"),
417+
),
418+
)
419+
elif event_type == "function_call_output":
420+
call_id = item.get("call_id")
421+
output_val = item.get("output")
422+
conn.execute(
423+
f"""
424+
UPDATE {self.tool_calls_table}
425+
SET output = ?, status = 'completed'
426+
WHERE session_id = ? AND call_id = ?
427+
""",
428+
(
429+
json.dumps(output_val) if output_val is not None else None,
430+
self.session_id,
431+
call_id,
432+
),
433+
)
288434

289435
# Update session timestamp
290436
conn.execute(
@@ -326,6 +472,7 @@ def _pop_item_sync():
326472
)
327473

328474
result = cursor.fetchone()
475+
329476
conn.commit()
330477

331478
if result:
@@ -334,7 +481,6 @@ def _pop_item_sync():
334481
item = json.loads(message_data)
335482
return item
336483
except json.JSONDecodeError:
337-
# Return None for corrupted JSON entries (already deleted)
338484
return None
339485

340486
return None

0 commit comments

Comments
 (0)