Skip to content

Commit 86f5b55

Browse files
committed
[Temporal - Documentation] Fixing PR errors
1 parent d15723c commit 86f5b55

File tree

5 files changed

+46
-52
lines changed

5 files changed

+46
-52
lines changed

examples/pydantic_ai_examples/temporal_streaming/agents.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@
33
This module defines the agent setup with MCP toolsets, model configuration,
44
and custom tools for data analysis.
55
"""
6-
76
from datetime import timedelta
87

98
from temporalio.common import RetryPolicy
109
from temporalio.workflow import ActivityConfig
1110

1211
from pydantic_ai import Agent, FilteredToolset, ModelSettings
12+
from pydantic_ai.agent import EventStreamHandler
1313
from pydantic_ai.durable_exec.temporal import TemporalAgent
1414
from pydantic_ai.mcp import MCPServerStdio
1515
from pydantic_ai.models.anthropic import AnthropicModel
1616
from pydantic_ai.providers.anthropic import AnthropicProvider
17-
1817
from .datamodels import AgentDependencies
1918

2019

@@ -35,43 +34,40 @@ async def get_mcp_toolsets() -> dict[str, FilteredToolset]:
3534
return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)}
3635

3736

38-
async def get_claude_model(parallel_tool_calls: bool = True, **env_vars):
37+
async def get_claude_model(parallel_tool_calls: bool = True, **kwargs):
3938
"""
4039
Create and configure the Claude model.
4140
4241
Args:
4342
parallel_tool_calls: Whether to enable parallel tool calls.
44-
**env_vars: Environment variables including API keys.
43+
**kwargs: Environment variables including API keys.
4544
4645
Returns:
4746
Configured AnthropicModel instance.
4847
"""
49-
model_name = 'claude-sonnet-4-5-20250929'
50-
api_key = env_vars.get('anthropic_api_key')
51-
model = AnthropicModel(
48+
model_name: str = 'claude-sonnet-4-5-20250929'
49+
api_key: str | None = kwargs.get('anthropic_api_key', None)
50+
model: AnthropicModel = AnthropicModel(
5251
model_name=model_name,
5352
provider=AnthropicProvider(api_key=api_key),
5453
settings=ModelSettings(
55-
**{
56-
'temperature': 0.5,
57-
'n': 1,
58-
'max_completion_tokens': 64000,
59-
'max_tokens': 64000,
60-
'parallel_tool_calls': parallel_tool_calls,
61-
}
54+
temperature=0.5,
55+
max_tokens=64000,
56+
parallel_tool_calls=parallel_tool_calls,
6257
),
6358
)
6459

6560
return model
6661

6762

68-
async def build_agent(stream_handler=None, **env_vars):
63+
async def build_agent(stream_handler: EventStreamHandler,
64+
**kwargs) -> TemporalAgent:
6965
"""
7066
Build and configure the agent with tools and temporal settings.
7167
7268
Args:
7369
stream_handler: Optional event stream handler for streaming responses.
74-
**env_vars: Environment variables including API keys.
70+
**kwargs: Environment variables including API keys.
7571
7672
Returns:
7773
TemporalAgent instance ready for use in Temporal workflows.
@@ -84,7 +80,7 @@ async def build_agent(stream_handler=None, **env_vars):
8480
toolsets = await get_mcp_toolsets()
8581
agent = Agent(
8682
name=agent_name,
87-
model=await get_claude_model(**env_vars),
83+
model=await get_claude_model(**kwargs),
8884
toolsets=[*toolsets.values()],
8985
system_prompt=system_prompt,
9086
event_stream_handler=stream_handler,

examples/pydantic_ai_examples/temporal_streaming/main.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,28 @@
77
import asyncio
88
import os
99
import uuid
10+
from typing import Any, Union
1011

11-
from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin
12-
from temporalio.client import Client
12+
from temporalio.client import Client, WorkflowHandle
1313
from temporalio.worker import Worker
1414

15+
from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin
1516
from .agents import build_agent
1617
from .datamodels import EventKind, EventStream
1718
from .streaming_handler import streaming_handler
1819
from .utils import read_config_yml
1920
from .workflow import YahooFinanceSearchWorkflow
2021

2122

22-
async def poll_events(workflow_handle):
23+
async def poll_events(workflow_handle: WorkflowHandle[Any, str]) -> None:
2324
"""
2425
Poll for events from the workflow and print them.
2526
2627
Args:
2728
workflow_handle: Handle to the running workflow.
2829
"""
2930
while True:
30-
event = await workflow_handle.query('event_stream', result_type=EventStream | None)
31+
event = await workflow_handle.query('event_stream', result_type=Union[EventStream | None])
3132
if event is None:
3233
await asyncio.sleep(0.1)
3334
continue
@@ -41,7 +42,7 @@ async def poll_events(workflow_handle):
4142
print(f'\n--- Event ---\n{event.content}\n')
4243

4344

44-
async def main():
45+
async def main() -> None:
4546
"""
4647
Main function to set up and run the Temporal workflow.
4748
@@ -76,11 +77,12 @@ async def main():
7677
):
7778
# Execute the workflow
7879
workflow_id = f'yahoo-finance-search-{uuid.uuid4()}'
79-
workflow_handle = await client.start_workflow(
80-
YahooFinanceSearchWorkflow.run,
81-
args=['What are the latest financial metrics for Apple (AAPL)?'],
80+
workflow_handle: WorkflowHandle[Any, str] = await client.start_workflow(
81+
'YahooFinanceSearchWorkflow',
82+
arg='What are the latest financial metrics for Apple (AAPL)?',
8283
id=workflow_id,
8384
task_queue=task_queue,
85+
result_type=str
8486
)
8587

8688
print(f'Started workflow with ID: {workflow_id}')

examples/pydantic_ai_examples/temporal_streaming/streaming_handler.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
from collections.abc import AsyncIterable
88

9+
from temporalio import activity
10+
from temporalio.client import WorkflowHandle
11+
912
from pydantic_ai import (
1013
AgentStreamEvent,
1114
FunctionToolCallEvent,
@@ -15,17 +18,15 @@
1518
TextPart,
1619
TextPartDelta,
1720
ThinkingPartDelta,
18-
ToolCallPart,
21+
ToolCallPart, RunContext,
1922
)
20-
from temporalio import activity
21-
2223
from .datamodels import AgentDependencies, EventKind, EventStream
2324

2425

2526
async def streaming_handler(
26-
ctx,
27-
event_stream_events: AsyncIterable[AgentStreamEvent],
28-
):
27+
ctx: RunContext,
28+
event_stream_events: AsyncIterable[AgentStreamEvent],
29+
) -> None:
2930
"""
3031
Handle streaming events from the agent.
3132
@@ -37,8 +38,11 @@ async def streaming_handler(
3738
ctx: The run context containing dependencies.
3839
event_stream_events: Async iterable of agent stream events.
3940
"""
40-
output = ''
41-
output_tool_delta = dict(
41+
if not activity.in_activity():
42+
return
43+
44+
output: str = ''
45+
output_tool_delta: dict[str, str] = dict(
4246
tool_call_id='',
4347
tool_name_delta='',
4448
args_delta='',
@@ -78,19 +82,10 @@ async def streaming_handler(
7882
output += f'\nTool Name: {output_tool_delta["tool_name_delta"]}'
7983
output += f'\nTool Args: {output_tool_delta["args_delta"]}'
8084

81-
events = []
82-
83-
# Create event stream if there's output
84-
if output:
85-
event = EventStream(kind=EventKind.EVENT, content=output)
86-
events.append(event)
87-
8885
# Send events to workflow if running in an activity
89-
if activity.in_activity():
90-
deps: AgentDependencies = ctx.deps
86+
deps: AgentDependencies = ctx.deps
9187

92-
workflow_id = deps.workflow_id
93-
run_id = deps.run_id
94-
workflow_handle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
95-
for event in events:
96-
await workflow_handle.signal('append_event', arg=event)
88+
workflow_id: str = deps.workflow_id
89+
run_id: str = deps.run_id
90+
workflow_handle: WorkflowHandle = activity.client().get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
91+
await workflow_handle.signal('append_event', arg=EventStream(kind=EventKind.EVENT, content=output))

examples/pydantic_ai_examples/temporal_streaming/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import os
44
from copy import copy
5+
from typing import Any
56

67
import yaml
78

89

9-
def recursively_modify_api_key(conf):
10+
def recursively_modify_api_key(conf) -> dict[str, Any]:
1011
"""
1112
Recursively replace API key placeholders with environment variable values.
1213
@@ -41,7 +42,7 @@ def inner(_conf):
4142
return copy_conf
4243

4344

44-
def read_config_yml(path):
45+
def read_config_yml(path) -> dict[str, Any]:
4546
"""
4647
Read and process a YAML configuration file.
4748

examples/pydantic_ai_examples/temporal_streaming/workflow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from datetime import timedelta
1111
from typing import Any
1212

13-
from pydantic_ai import UsageLimits
1413
from temporalio import activity, workflow
1514

15+
from pydantic_ai import UsageLimits
1616
from .agents import build_agent
1717
from .datamodels import AgentDependencies, EventKind, EventStream
1818
from .streaming_handler import streaming_handler
@@ -33,7 +33,7 @@ def __init__(self):
3333
self.events: deque[EventStream] = deque()
3434

3535
@workflow.run
36-
async def run(self, user_prompt: str):
36+
async def run(self, user_prompt: str) -> str:
3737
"""
3838
Execute the agent with the given user prompt.
3939
@@ -78,7 +78,7 @@ async def run(self, user_prompt: str):
7878

7979
@staticmethod
8080
@activity.defn(name='retrieve_env_vars')
81-
async def retrieve_env_vars():
81+
async def retrieve_env_vars() -> dict[str, Any]:
8282
"""
8383
Retrieve environment variables from configuration file.
8484

0 commit comments

Comments
 (0)