Skip to content

Commit 944606c

Browse files
committed
[Temporal - Documentation] Moved it to examples
1 parent 31a5dc8 commit 944606c

File tree

10 files changed

+728
-332
lines changed

10 files changed

+728
-332
lines changed

docs/durable_execution/temporal.md

Lines changed: 9 additions & 332 deletions
Original file line numberDiff line numberDiff line change
@@ -185,343 +185,20 @@ The event stream handler function will receive the agent [run context][pydantic_
185185
As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care:
186186

187187
- To get data from the workflow call site or workflow to the event stream handler, you can use a [dependencies object](#agent-run-context-and-dependencies).
188-
- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it.
188+
- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from. Alternatively, you can use Temporal's built-in signals and queries to pass events from activities to the workflow and from the workflow to the caller.
189189

190190
#### Example
191191

192-
Run the following
193-
`pip install pydantic-ai temporalio mcp-run-python`
194-
195-
Assuming your project has the following structure:
196-
```
197-
project/
198-
├── src/
199-
│ ├── agents.py
200-
│ ├── datamodels.py
201-
│ ├── streaming_handler.py
202-
│ ├── utils.py
203-
│ └── workflow.py
204-
└── pyproject.toml
205-
206-
```
207-
208-
```py {title="utils.py" test="skip"}
209-
import os
210-
from copy import copy
211-
212-
import yaml
213-
214-
215-
def recursively_modify_api_key(conf):
216-
"""
217-
Recursively replace API key placeholders with environment variable values.
218-
219-
This function traverses a configuration dictionary and replaces any keys
220-
containing 'api_key' with the corresponding environment variable value.
221-
It handles nested dictionaries and lists recursively.
222-
223-
Args:
224-
conf: The configuration dictionary to process.
225-
226-
Returns:
227-
A copy of the configuration with API keys replaced by environment variable values.
228-
"""
229-
230-
def inner(_conf):
231-
for key, value in _conf.items():
232-
if isinstance(value, dict):
233-
inner(value)
234-
elif isinstance(value, list):
235-
if len(value) > 0 and isinstance(value[0], dict):
236-
for item in value:
237-
inner(item)
238-
else:
239-
_conf[key] = [os.environ.get(str(v), v) for v in value]
240-
elif isinstance(value, str):
241-
_conf[key] = os.environ.get(value, value)
242-
else:
243-
_conf[key] = value
244-
245-
copy_conf = copy(conf)
246-
inner(copy_conf)
247-
return copy_conf
248-
249-
250-
def read_config_yml(path):
251-
"""
252-
Read and process a YAML configuration file.
253-
254-
This function reads a YAML file, processes it to replace API key placeholders
255-
with environment variable values, and returns the processed configuration.
256-
257-
Args:
258-
path: The path to the YAML configuration file.
259-
260-
Returns:
261-
dict: The parsed and processed YAML content as a Python dictionary.
262-
"""
263-
with open(path) as f:
264-
configs = yaml.safe_load(f)
265-
recursively_modify_api_key(configs)
266-
return configs
267-
```
268-
269-
```py {title="datamodels.py" test="skip"}
270-
from enum import Enum
271-
272-
from pydantic import BaseModel
273-
274-
275-
class AgentDependencies(BaseModel):
276-
workflow_id: str
277-
run_id: str
192+
For a complete working example of streaming with Temporal using signals and queries, see the [temporal_streaming example](https://github.com/pydantic/pydantic-ai/tree/main/examples/pydantic_ai_examples/temporal_streaming). This example demonstrates:
278193

194+
- How to use an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] to capture agent events in activities
195+
- Using Temporal signals to send events from activities to the workflow
196+
- Using Temporal queries to poll events from the workflow to the caller
197+
- Setting up dependencies to pass workflow identification for signal routing
198+
- Integrating MCP toolsets and custom tools with streaming
199+
- Complete project structure with all necessary files
279200

280-
class EventKind(str, Enum):
281-
CONTINUE_CHAT = 'continue_chat'
282-
EVENT = 'event'
283-
RESULT = 'result'
284-
285-
286-
class EventStream(BaseModel):
287-
kind: EventKind
288-
content: str
289-
```
290-
291-
292-
```py {title="agents.py" test="skip"}
293-
from datetime import timedelta
294-
295-
from mcp_run_python import code_sandbox
296-
from pydantic_ai import Agent, FilteredToolset, ModelSettings, RunContext
297-
from pydantic_ai.durable_exec.temporal import TemporalAgent
298-
from pydantic_ai.mcp import MCPServerStdio
299-
from pydantic_ai.models.anthropic import AnthropicModel
300-
from pydantic_ai.providers.anthropic import AnthropicProvider
301-
from temporalio.common import RetryPolicy
302-
from temporalio.workflow import ActivityConfig
303-
304-
from .datamodels import AgentDependencies
305-
306-
async def get_mcp_toolsets() -> dict[str, FilteredToolset]:
307-
yf_server = MCPServerStdio(
308-
command='uvx',
309-
args=['mcp-yahoo-finance'],
310-
timeout=240,
311-
read_timeout=240,
312-
id='yahoo'
313-
)
314-
return {
315-
'yahoo': yf_server.filtered(lambda ctx, tool_def: True)
316-
}
317-
318-
319-
async def get_claude_model(parallel_tool_calls: bool = True, **env_vars):
320-
model_name = 'claude-sonnet-4-5-20250929'
321-
api_key = env_vars.get('anthropic_api_key')
322-
model = AnthropicModel(model_name=model_name,
323-
provider=AnthropicProvider(api_key=api_key),
324-
settings=ModelSettings(**{
325-
'temperature': 0.5,
326-
'n': 1,
327-
'max_completion_tokens': 64000,
328-
'max_tokens': 64000,
329-
'parallel_tool_calls': parallel_tool_calls,
330-
}))
331-
332-
return model
333-
334-
335-
async def build_agent(stream_handler=None, **env_vars):
336-
system_prompt = """
337-
You are an expert travel agent that knows perfectly how to search for hotels on the web.
338-
You also have a Data Analyst background, mastering well how to use pandas for tabular operations.
339-
"""
340-
agent_name = 'YahooFinanceSearchAgent'
341-
342-
toolsets = await get_mcp_toolsets()
343-
agent = Agent(name=agent_name,
344-
model=await get_claude_model(**env_vars), # Here you place your Model instance
345-
toolsets=[*toolsets.values()],
346-
system_prompt=system_prompt,
347-
event_stream_handler=stream_handler,
348-
deps_type=AgentDependencies,
349-
)
350-
351-
@agent.tool(name='run_python_code')
352-
async def run_python_code(ctx: RunContext[None], code: str) -> str:
353-
async with code_sandbox(dependencies=['pandas', 'numpy']) as sandbox:
354-
result = await sandbox.eval(code)
355-
return result
356-
357-
temporal_agent = TemporalAgent(wrapped=agent,
358-
model_activity_config=ActivityConfig(
359-
start_to_close_timeout=timedelta(minutes=5),
360-
retry_policy=RetryPolicy(maximum_attempts=50)
361-
),
362-
toolset_activity_config={
363-
toolset_id: ActivityConfig(
364-
start_to_close_timeout=timedelta(minutes=3),
365-
retry_policy=RetryPolicy(maximum_attempts=3,
366-
non_retryable_error_types=['ToolRetryError']
367-
)
368-
) for toolset_id in toolsets.keys()})
369-
return temporal_agent
370-
```
371-
372-
```py {title="streaming_handler.py" test="skip"}
373-
from collections.abc import AsyncIterable
374-
375-
from .datamodels import AgentDependencies, EventKind, EventStream
376-
from temporalio import activity
377-
378-
from pydantic_ai import (
379-
AgentStreamEvent,
380-
FunctionToolCallEvent,
381-
PartStartEvent,
382-
FunctionToolResultEvent,
383-
TextPart,
384-
ToolCallPart,
385-
PartDeltaEvent,
386-
TextPartDelta,
387-
ThinkingPartDelta,
388-
)
389-
390-
391-
async def streaming_handler(ctx,
392-
event_stream_events: AsyncIterable[AgentStreamEvent]):
393-
"""
394-
This function is used by the agent to stream-out the actions that are being performed (tool calls, llm call, streaming results, etc etc.
395-
Feel free to change it as you like or need - skipping events or enriching the content
396-
"""
397-
398-
output = ''
399-
output_tool_delta = dict(
400-
tool_call_id='',
401-
tool_name_delta='',
402-
args_delta='',
403-
)
404-
# If TextPart and output delta is empty
405-
async for event in event_stream_events:
406-
if isinstance(event, PartStartEvent):
407-
if isinstance(event.part, TextPart):
408-
output += f'{event.part.content}'
409-
elif isinstance(event.part, ToolCallPart):
410-
output += f'\nTool Call Id: {event.part.tool_call_id}'
411-
output += f'\nTool Name: {event.part.tool_name}'
412-
output += f'\nTool Args: {event.part.args}'
413-
else:
414-
pass
415-
elif isinstance(event, FunctionToolCallEvent):
416-
output += f'\nTool Call Id: {event.part.tool_call_id}'
417-
output += f'\nTool Name: {event.part.tool_name}'
418-
output += f'\nTool Args: {event.part.args}'
419-
elif isinstance(event, FunctionToolResultEvent):
420-
output += f'\nTool Call Id: {event.result.tool_call_id}'
421-
output += f'\nTool Name: {event.result.tool_name}'
422-
output += f'\nContent: {event.result.content}'
423-
elif isinstance(event, PartDeltaEvent):
424-
if isinstance(event.delta, TextPartDelta) or isinstance(event.delta, ThinkingPartDelta):
425-
output += f'{event.delta.content_delta}'
426-
else:
427-
if len(output_tool_delta['tool_call_id']) == 0:
428-
output_tool_delta['tool_call_id'] += event.delta.tool_call_id or ''
429-
output_tool_delta['tool_name_delta'] += event.delta.tool_name_delta or ''
430-
output_tool_delta['args_delta'] += event.delta.args_delta or ''
431-
432-
if len(output_tool_delta['tool_call_id']):
433-
output += f'\nTool Call Id: {output_tool_delta["tool_call_id"]}'
434-
output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}'
435-
output += f'\nTool Args: {output_tool_delta["args_delta"]}'
436-
437-
events = []
438-
439-
if output:
440-
event = EventStream(kind=EventKind.EVENT, content=output)
441-
events.append(event)
442-
443-
if activity.in_activity():
444-
deps: AgentDependencies = ctx.deps
445-
446-
workflow_id = deps.workflow_id
447-
run_id = deps.run_id
448-
workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
449-
for event in events:
450-
await workflow_handle.signal('append_event', arg=event)
451-
```
452-
453-
```py {title="workflow.py" test="skip"}
454-
import asyncio
455-
from collections import deque
456-
from datetime import timedelta
457-
from typing import Any
458-
459-
from pydantic_ai import UsageLimits
460-
from temporalio import activity, workflow
461-
462-
from .agents import build_agent, streaming_handler
463-
from .datamodels import AgentDependencies, EventKind, EventStream
464-
465-
@workflow.defn
466-
class YahooFinanceSearchWorkflow:
467-
def __init__(self):
468-
self.events: deque[EventStream] = deque()
469-
470-
@workflow.run
471-
async def run(self, user_prompt: str):
472-
473-
wf_vars = await workflow.execute_activity(
474-
activity='retrieve_env_vars',
475-
start_to_close_timeout=timedelta(seconds=10),
476-
result_type=dict[str, Any],
477-
)
478-
deps = AgentDependencies(workflow_id=workflow.info().workflow_id, run_id=workflow.info().run_id)
479-
480-
agent = await build_agent(streaming_handler, **wf_vars)
481-
result = await agent.run(user_prompt=user_prompt,
482-
usage_limits=UsageLimits(request_limit=50),
483-
deps=deps
484-
)
485-
486-
await self.append_event(event_stream=EventStream(kind=EventKind.RESULT,
487-
content=result.output))
488-
489-
await self.append_event(event_stream=EventStream(kind=EventKind.CONTINUE_CHAT,
490-
content=''))
491-
492-
try:
493-
await workflow.wait_condition(
494-
lambda: len(self.events) == 0,
495-
timeout=timedelta(seconds=10),
496-
timeout_summary='Waiting for events to be consumed'
497-
)
498-
return result.output
499-
except asyncio.TimeoutError:
500-
return result.output
501-
502-
@staticmethod
503-
@activity.defn(name='retrieve_env_vars')
504-
async def retrieve_env_vars():
505-
import os
506-
from .utils import read_config_yml
507-
508-
config_path = os.getenv('APP_CONFIG_PATH', './app_conf.yml')
509-
configs = read_config_yml(config_path)
510-
return {
511-
'anthropic_api_key': configs['llm']['anthropic_api_key']
512-
}
513-
514-
@workflow.query
515-
def event_stream(self) -> EventStream | None:
516-
if self.events:
517-
return self.events.popleft()
518-
return None
519-
520-
@workflow.signal
521-
async def append_event(self, event_stream: EventStream):
522-
# This signal is invoked by streaming_handler, pushing event for every async loop
523-
self.events.append(event_stream)
524-
```
201+
The example includes a Yahoo Finance search agent with Python code execution capabilities, showing how to stream tool calls, model responses, and results in real-time during workflow execution.
525202

526203

527204

0 commit comments

Comments
 (0)