Skip to content

Commit af03519

Browse files
committed
incorporate feedback, also add new context manager for session to auto-close
Signed-off-by: Filinto Duran <[email protected]>
1 parent f4519df commit af03519

File tree

4 files changed

+189
-140
lines changed

4 files changed

+189
-140
lines changed

docs/sessions/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ Use meaningful session IDs that help you organize conversations:
268268
- Use in-memory SQLite (`SQLiteSession("session_id")`) for temporary conversations
269269
- Use file-based SQLite (`SQLiteSession("session_id", "path/to/db.sqlite")`) for persistent conversations
270270
- Use SQLAlchemy-powered sessions (`SQLAlchemySession("session_id", engine=engine, create_tables=True)`) for production systems with existing databases supported by SQLAlchemy
271-
- Use Dapr state store sessions (`DaprSession.from_address("session_id", state_store_name="statestore", dapr_address="localhost:50001")`) for productioncloud-native deployments with support for
271+
- Use Dapr state store sessions (`DaprSession.from_address("session_id", state_store_name="statestore", dapr_address="localhost:50001")`) for production cloud-native deployments with support for
272272
30+ database backends with built-in telemetry, tracing, and data isolation
273273
- Use OpenAI-hosted storage (`OpenAIConversationsSession()`) when you prefer to store history in the OpenAI Conversations API
274274
- Use encrypted sessions (`EncryptedSession(session_id, underlying_session, encryption_key)`) to wrap any session with transparent encryption and TTL-based expiration

examples/memory/dapr_session_example.py

Lines changed: 139 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
WHAT IS DAPR?
99
Dapr (https://dapr.io) is a portable, event-driven runtime that simplifies building
1010
resilient applications. Its state management building block provides a unified API
11-
for storing data across 30+ databases with built-in telemetry, tracing, and data
12-
isolation. See: https://docs.dapr.io/developing-applications/building-blocks/state-management/
11+
for storing data across 30+ databases with built-in telemetry, tracing, encryption, data
12+
isolation and lifecycle management via time-to-live (TTL). See: https://docs.dapr.io/developing-applications/building-blocks/state-management/
1313
1414
WHEN TO USE DaprSession:
1515
- Horizontally scaled deployments (multiple agent instances behind a load balancer)
@@ -30,12 +30,24 @@
3030
- Consistency levels: Eventual (faster) or strong (read-after-write guarantee)
3131
- State encryption: AES-GCM encryption at the Dapr component level
3232
- Cloud-native: Seamless Kubernetes integration (Dapr runs as sidecar)
33+
- Cloud Service Provider (CSP) native authentication and authorization support.
3334
3435
PREREQUISITES:
3536
1. Install Dapr CLI: https://docs.dapr.io/getting-started/install-dapr-cli/
36-
2. Initialize Dapr: dapr init
37-
3. Start Redis: docker run -d -p 6379:6379 redis:7-alpine
38-
4. Create components directory with statestore.yaml configuration
37+
2. Install Docker (for running Redis and optionally Dapr containers)
38+
3. Choose one of the following setups:
39+
40+
Option A - Full Dapr environment (recommended if you plan to use other Dapr features):
41+
- Run: dapr init
42+
- This installs Redis, Zipkin, and Placement service locally
43+
- Useful for workflows, actors, pub/sub, and other Dapr building blocks
44+
45+
Option B - Minimal setup (just for DaprSession):
46+
- Start Redis only: docker run -d -p 6379:6379 redis:7-alpine
47+
- Requires only Dapr CLI (no dapr init needed)
48+
49+
4. Create components directory with statestore.yaml configuration (see setup_instructions())
50+
5. As always, ensure that the OPENAI_API_KEY environment variable is set.
3951
4052
COMMON ISSUES:
4153
- "Health check connection refused (port 3500)": Always use --dapr-http-port 3500
@@ -78,109 +90,108 @@ async def main():
7890
) # noqa: E501
7991
print()
8092

81-
# Create a Dapr session instance
93+
# Create a Dapr session instance with context manager for automatic cleanup
8294
session_id = "dapr_conversation_123"
8395
try:
84-
session = DaprSession.from_address(
96+
# Use async with to automatically close the session on exit
97+
async with DaprSession.from_address(
8598
session_id,
8699
state_store_name="statestore",
87100
dapr_address=f"localhost:{grpc_port}",
88-
)
101+
) as session:
102+
# Test Dapr connectivity
103+
if not await session.ping():
104+
print("Dapr sidecar is not available!")
105+
print("Please start Dapr sidecar and try again.")
106+
print(
107+
"Command: dapr run --app-id myapp --dapr-http-port 3500 --dapr-grpc-port 50001 --resources-path ./components"
108+
) # noqa: E501
109+
return
110+
111+
print("Connected to Dapr successfully!")
112+
print(f"Session ID: {session_id}")
113+
print("State Store: statestore")
114+
115+
# Clear any existing session data for a clean start
116+
await session.clear_session()
117+
print("Session cleared for clean demonstration.")
118+
print("The agent will remember previous messages automatically.\n")
119+
120+
# First turn
121+
print("First turn:")
122+
print("User: What city is the Golden Gate Bridge in?")
123+
result = await Runner.run(
124+
agent,
125+
"What city is the Golden Gate Bridge in?",
126+
session=session,
127+
)
128+
print(f"Assistant: {result.final_output}")
129+
print()
130+
131+
# Second turn - the agent will remember the previous conversation
132+
print("Second turn:")
133+
print("User: What state is it in?")
134+
result = await Runner.run(agent, "What state is it in?", session=session)
135+
print(f"Assistant: {result.final_output}")
136+
print()
137+
138+
# Third turn - continuing the conversation
139+
print("Third turn:")
140+
print("User: What's the population of that state?")
141+
result = await Runner.run(
142+
agent,
143+
"What's the population of that state?",
144+
session=session,
145+
)
146+
print(f"Assistant: {result.final_output}")
147+
print()
89148

90-
# Test Dapr connectivity
91-
if not await session.ping():
92-
print("Dapr sidecar is not available!")
93-
print("Please start Dapr sidecar and try again.")
149+
print("=== Conversation Complete ===")
150+
print("Notice how the agent remembered the context from previous turns!")
94151
print(
95-
"Command: dapr run --app-id myapp --dapr-http-port 3500 --dapr-grpc-port 50001 --resources-path ./components"
96-
) # noqa: E501
97-
return
98-
99-
print("Connected to Dapr successfully!")
100-
print(f"Session ID: {session_id}")
101-
print("State Store: statestore")
102-
103-
# Clear any existing session data for a clean start
104-
await session.clear_session()
105-
print("Session cleared for clean demonstration.")
106-
print("The agent will remember previous messages automatically.\n")
107-
108-
# First turn
109-
print("First turn:")
110-
print("User: What city is the Golden Gate Bridge in?")
111-
result = await Runner.run(
112-
agent,
113-
"What city is the Golden Gate Bridge in?",
114-
session=session,
115-
)
116-
print(f"Assistant: {result.final_output}")
117-
print()
118-
119-
# Second turn - the agent will remember the previous conversation
120-
print("Second turn:")
121-
print("User: What state is it in?")
122-
result = await Runner.run(agent, "What state is it in?", session=session)
123-
print(f"Assistant: {result.final_output}")
124-
print()
125-
126-
# Third turn - continuing the conversation
127-
print("Third turn:")
128-
print("User: What's the population of that state?")
129-
result = await Runner.run(
130-
agent,
131-
"What's the population of that state?",
132-
session=session,
133-
)
134-
print(f"Assistant: {result.final_output}")
135-
print()
136-
137-
print("=== Conversation Complete ===")
138-
print("Notice how the agent remembered the context from previous turns!")
139-
print("Dapr session automatically handles conversation history with backend flexibility.")
140-
141-
# Demonstrate session persistence
142-
print("\n=== Session Persistence Demo ===")
143-
all_items = await session.get_items()
144-
print(f"Total messages stored in Dapr: {len(all_items)}")
145-
146-
# Demonstrate the limit parameter
147-
print("\n=== Latest Items Demo ===")
148-
latest_items = await session.get_items(limit=2)
149-
print("Latest 2 items:")
150-
for i, msg in enumerate(latest_items, 1):
151-
role = msg.get("role", "unknown")
152-
content = msg.get("content", "")
153-
print(f" {i}. {role}: {content}")
154-
155-
# Demonstrate session isolation with a new session
156-
print("\n=== Session Isolation Demo ===")
157-
new_session = DaprSession.from_address(
158-
"different_conversation_456",
159-
state_store_name="statestore",
160-
dapr_address=f"localhost:{grpc_port}",
161-
)
162-
163-
print("Creating a new session with different ID...")
164-
result = await Runner.run(
165-
agent,
166-
"Hello, this is a new conversation!",
167-
session=new_session,
168-
)
169-
print(f"New session response: {result.final_output}")
170-
171-
# Show that sessions are isolated
172-
original_items = await session.get_items()
173-
new_items = await new_session.get_items()
174-
print(f"Original session has {len(original_items)} items")
175-
print(f"New session has {len(new_items)} items")
176-
print("Sessions are completely isolated!")
177-
178-
# Clean up the new session
179-
await new_session.clear_session()
180-
await new_session.close()
181-
182-
# Close the main session
183-
await session.close()
152+
"Dapr session automatically handles conversation history with backend flexibility."
153+
)
154+
155+
# Demonstrate session persistence
156+
print("\n=== Session Persistence Demo ===")
157+
all_items = await session.get_items()
158+
print(f"Total messages stored in Dapr: {len(all_items)}")
159+
160+
# Demonstrate the limit parameter
161+
print("\n=== Latest Items Demo ===")
162+
latest_items = await session.get_items(limit=2)
163+
print("Latest 2 items:")
164+
for i, msg in enumerate(latest_items, 1):
165+
role = msg.get("role", "unknown")
166+
content = msg.get("content", "")
167+
print(f" {i}. {role}: {content}")
168+
169+
# Demonstrate session isolation with a new session
170+
print("\n=== Session Isolation Demo ===")
171+
# Use context manager for the new session too
172+
async with DaprSession.from_address(
173+
"different_conversation_456",
174+
state_store_name="statestore",
175+
dapr_address=f"localhost:{grpc_port}",
176+
) as new_session:
177+
print("Creating a new session with different ID...")
178+
result = await Runner.run(
179+
agent,
180+
"Hello, this is a new conversation!",
181+
session=new_session,
182+
)
183+
print(f"New session response: {result.final_output}")
184+
185+
# Show that sessions are isolated
186+
original_items = await session.get_items()
187+
new_items = await new_session.get_items()
188+
print(f"Original session has {len(original_items)} items")
189+
print(f"New session has {len(new_items)} items")
190+
print("Sessions are completely isolated!")
191+
192+
# Clean up the new session
193+
await new_session.clear_session()
194+
# No need to call close() - context manager handles it automatically!
184195

185196
except Exception as e:
186197
print(f"Error: {e}")
@@ -196,53 +207,45 @@ async def demonstrate_advanced_features():
196207
try:
197208
# TTL (time-to-live) configuration
198209
print("\n1. TTL Configuration:")
199-
ttl_session = DaprSession.from_address(
210+
async with DaprSession.from_address(
200211
"ttl_demo_session",
201212
state_store_name="statestore",
202213
dapr_address=f"localhost:{grpc_port}",
203214
ttl=3600, # 1 hour TTL
204-
)
205-
206-
if await ttl_session.ping():
207-
await Runner.run(
208-
Agent(name="Assistant", instructions="Be helpful"),
209-
"This message will expire in 1 hour",
210-
session=ttl_session,
211-
)
212-
print("Created session with 1-hour TTL - messages will auto-expire")
213-
print("(TTL support depends on the underlying state store)")
214-
215-
await ttl_session.close()
215+
) as ttl_session:
216+
if await ttl_session.ping():
217+
await Runner.run(
218+
Agent(name="Assistant", instructions="Be helpful"),
219+
"This message will expire in 1 hour",
220+
session=ttl_session,
221+
)
222+
print("Created session with 1-hour TTL - messages will auto-expire")
223+
print("(TTL support depends on the underlying state store)")
216224

217225
# Consistency levels
218226
print("\n2. Consistency Levels:")
219227

220228
# Eventual consistency (better performance)
221-
eventual_session = DaprSession.from_address(
229+
async with DaprSession.from_address(
222230
"eventual_session",
223231
state_store_name="statestore",
224232
dapr_address=f"localhost:{grpc_port}",
225233
consistency=DAPR_CONSISTENCY_EVENTUAL,
226-
)
234+
) as eventual_session:
235+
if await eventual_session.ping():
236+
print("Eventual consistency: Better performance, may have slight delays")
237+
await eventual_session.add_items([{"role": "user", "content": "Test eventual"}])
227238

228239
# Strong consistency (guaranteed read-after-write)
229-
strong_session = DaprSession.from_address(
240+
async with DaprSession.from_address(
230241
"strong_session",
231242
state_store_name="statestore",
232243
dapr_address=f"localhost:{grpc_port}",
233244
consistency=DAPR_CONSISTENCY_STRONG,
234-
)
235-
236-
if await eventual_session.ping():
237-
print("Eventual consistency: Better performance, may have slight delays")
238-
await eventual_session.add_items([{"role": "user", "content": "Test eventual"}])
239-
240-
if await strong_session.ping():
241-
print("Strong consistency: Guaranteed immediate consistency")
242-
await strong_session.add_items([{"role": "user", "content": "Test strong"}])
243-
244-
await eventual_session.close()
245-
await strong_session.close()
245+
) as strong_session:
246+
if await strong_session.ping():
247+
print("Strong consistency: Guaranteed immediate consistency")
248+
await strong_session.add_items([{"role": "user", "content": "Test strong"}])
246249

247250
# Multi-tenancy example
248251
print("\n3. Multi-tenancy with Session Prefixes:")
@@ -255,16 +258,12 @@ def get_tenant_session(tenant_id: str, user_id: str) -> DaprSession:
255258
dapr_address=f"localhost:{grpc_port}",
256259
)
257260

258-
tenant_a_session = get_tenant_session("tenant-a", "user-123")
259-
tenant_b_session = get_tenant_session("tenant-b", "user-123")
260-
261-
if await tenant_a_session.ping() and await tenant_b_session.ping():
262-
await tenant_a_session.add_items([{"role": "user", "content": "Tenant A data"}])
263-
await tenant_b_session.add_items([{"role": "user", "content": "Tenant B data"}])
264-
print("Multi-tenant sessions created with isolated data")
265-
266-
await tenant_a_session.close()
267-
await tenant_b_session.close()
261+
async with get_tenant_session("tenant-a", "user-123") as tenant_a_session:
262+
async with get_tenant_session("tenant-b", "user-123") as tenant_b_session:
263+
if await tenant_a_session.ping() and await tenant_b_session.ping():
264+
await tenant_a_session.add_items([{"role": "user", "content": "Tenant A data"}])
265+
await tenant_b_session.add_items([{"role": "user", "content": "Tenant B data"}])
266+
print("Multi-tenant sessions created with isolated data")
268267

269268
except Exception as e:
270269
print(f"Advanced features error: {e}")
@@ -292,6 +291,7 @@ async def setup_instructions():
292291
value: ""
293292
""")
294293
print(" Start Redis: docker run -d -p 6379:6379 redis:7-alpine")
294+
print(" (Skip if you already ran 'dapr init' - it installs Redis locally)")
295295

296296
print("\n OPTION B - PostgreSQL (v2 recommended):")
297297
print("""

src/agents/extensions/memory/dapr_session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,14 @@ async def close(self) -> None:
341341
if self._owns_client:
342342
await self._dapr_client.close()
343343

344+
async def __aenter__(self) -> DaprSession:
345+
"""Enter async context manager."""
346+
return self
347+
348+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
349+
"""Exit async context manager and close the connection."""
350+
await self.close()
351+
344352
async def ping(self) -> bool:
345353
"""Test Dapr connectivity by checking metadata.
346354

0 commit comments

Comments
 (0)