Skip to content

Commit 3a72043

Browse files
committed
chore: demonstrate FunctionTool use in a (temporal) tutorial
1 parent c9eb040 commit 3a72043

File tree

1 file changed

+149
-28
lines changed
  • examples/tutorials/10_agentic/10_temporal/010_agent_chat/project

1 file changed

+149
-28
lines changed

examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/workflow.py

Lines changed: 149 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,50 @@
11
import os
2-
from typing import Dict, List, override
2+
import json
3+
from typing import Dict, List, override, Any
34
from dotenv import load_dotenv
45

5-
from dotenv import load_dotenv
66
from agentex.lib.utils.model_utils import BaseModel
77
from mcp import StdioServerParameters
88
from temporalio import workflow
9-
from agents import ModelSettings
9+
from agents import ModelSettings, RunContextWrapper
1010
from openai.types.shared import Reasoning
1111

1212
from agentex.lib import adk
1313
from agentex.lib.types.acp import CreateTaskParams, SendEventParams
1414
from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
1515
from agentex.lib.core.temporal.types.workflow import SignalName
1616
from agentex.lib.utils.logging import make_logger
17-
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
17+
from agentex.lib.core.tracing.tracing_processor_manager import (
18+
add_tracing_processor_config,
19+
)
1820
from agentex.lib.types.tracing import SGPTracingProcessorConfig
1921
from agentex.lib.environment_variables import EnvironmentVariables
2022
from agentex.types.text_content import TextContent
23+
from agentex.lib.core.temporal.activities.adk.providers.openai_activities import (
24+
FunctionTool,
25+
)
2126

2227
environment_variables = EnvironmentVariables.refresh()
2328
load_dotenv(dotenv_path=".env")
2429

25-
add_tracing_processor_config(SGPTracingProcessorConfig(
26-
sgp_api_key=os.environ.get("SCALE_GP_API_KEY", ""),
27-
sgp_account_id=os.environ.get("SCALE_GP_ACCOUNT_ID", ""),
28-
))
30+
add_tracing_processor_config(
31+
SGPTracingProcessorConfig(
32+
sgp_api_key=os.environ.get("SCALE_GP_API_KEY", ""),
33+
sgp_account_id=os.environ.get("SCALE_GP_ACCOUNT_ID", ""),
34+
)
35+
)
2936

30-
if environment_variables.WORKFLOW_NAME is None:
37+
if not environment_variables.WORKFLOW_NAME:
3138
raise ValueError("Environment variable WORKFLOW_NAME is not set")
3239

33-
if environment_variables.AGENT_NAME is None:
40+
if not environment_variables.AGENT_NAME:
3441
raise ValueError("Environment variable AGENT_NAME is not set")
3542

3643
logger = make_logger(__name__)
3744

3845

3946
class StateModel(BaseModel):
40-
input_list: List[Dict]
47+
input_list: List[Dict[str, Any]]
4148
turn_number: int
4249

4350

@@ -49,44 +56,139 @@ class StateModel(BaseModel):
4956
StdioServerParameters(
5057
command="uvx",
5158
args=["openai-websearch-mcp"],
52-
env={
53-
"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY", "")
54-
}
59+
env={"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY", "")},
5560
),
5661
]
5762

63+
64+
async def calculator(context: RunContextWrapper, args: str) -> str:
65+
"""
66+
Simple calculator that can perform basic arithmetic operations.
67+
68+
Args:
69+
context: The run context wrapper
70+
args: JSON string containing the operation and operands
71+
72+
Returns:
73+
String representation of the calculation result
74+
"""
75+
try:
76+
# Parse the JSON arguments
77+
parsed_args = json.loads(args)
78+
operation = parsed_args.get("operation")
79+
a = parsed_args.get("a")
80+
b = parsed_args.get("b")
81+
82+
if operation is None or a is None or b is None:
83+
return (
84+
"Error: Missing required parameters. "
85+
"Please provide 'operation', 'a', and 'b'."
86+
)
87+
88+
# Convert to numbers
89+
try:
90+
a = float(a)
91+
b = float(b)
92+
except (ValueError, TypeError):
93+
return "Error: 'a' and 'b' must be valid numbers."
94+
95+
# Perform the calculation
96+
if operation == "add":
97+
result = a + b
98+
elif operation == "subtract":
99+
result = a - b
100+
elif operation == "multiply":
101+
result = a * b
102+
elif operation == "divide":
103+
if b == 0:
104+
return "Error: Division by zero is not allowed."
105+
result = a / b
106+
else:
107+
supported_ops = "add, subtract, multiply, divide"
108+
return (
109+
f"Error: Unknown operation '{operation}'. "
110+
f"Supported operations: {supported_ops}."
111+
)
112+
113+
# Format the result nicely
114+
if result == int(result):
115+
return f"The result of {a} {operation} {b} is {int(result)}"
116+
else:
117+
formatted = f"{result:.6f}".rstrip("0").rstrip(".")
118+
return f"The result of {a} {operation} {b} is {formatted}"
119+
120+
except json.JSONDecodeError:
121+
return "Error: Invalid JSON format in arguments."
122+
except Exception as e:
123+
return f"Error: An unexpected error occurred: {str(e)}"
124+
125+
126+
# Create the calculator tool
127+
CALCULATOR_TOOL = FunctionTool(
128+
name="calculator",
129+
description=(
130+
"Performs basic arithmetic operations (add, subtract, multiply, divide) "
131+
"on two numbers."
132+
),
133+
params_json_schema={
134+
"type": "object",
135+
"properties": {
136+
"operation": {
137+
"type": "string",
138+
"enum": ["add", "subtract", "multiply", "divide"],
139+
"description": "The arithmetic operation to perform",
140+
},
141+
"a": {"type": "number", "description": "The first number"},
142+
"b": {"type": "number", "description": "The second number"},
143+
},
144+
"required": ["operation", "a", "b"],
145+
"additionalProperties": False,
146+
},
147+
strict_json_schema=True,
148+
on_invoke_tool=calculator,
149+
)
150+
151+
58152
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
59153
class At010AgentChatWorkflow(BaseWorkflow):
60154
"""
61155
Minimal async workflow template for AgentEx Temporal agents.
62156
"""
157+
63158
def __init__(self):
64159
super().__init__(display_name=environment_variables.AGENT_NAME)
65160
self._complete_task = False
66-
self._state = None
161+
self._state: StateModel | None = None
67162

68163
@workflow.signal(name=SignalName.RECEIVE_EVENT)
69164
@override
70165
async def on_task_event_send(self, params: SendEventParams) -> None:
71166
logger.info(f"Received task message instruction: {params}")
72-
167+
73168
if not params.event.content:
74169
return
75170
if params.event.content.type != "text":
76171
raise ValueError(f"Expected text message, got {params.event.content.type}")
77172

78173
if params.event.content.author != "user":
79-
raise ValueError(f"Expected user message, got {params.event.content.author}")
80-
174+
raise ValueError(
175+
f"Expected user message, got {params.event.content.author}"
176+
)
177+
178+
if self._state is None:
179+
raise ValueError("State is not initialized")
180+
81181
# Increment the turn number
82182
self._state.turn_number += 1
83183
# Add the new user message to the message history
84-
self._state.input_list.append({"role": "user", "content": params.event.content.content})
184+
self._state.input_list.append(
185+
{"role": "user", "content": params.event.content.content}
186+
)
85187

86188
async with adk.tracing.span(
87189
trace_id=params.task.id,
88190
name=f"Turn {self._state.turn_number}",
89-
input=self._state
191+
input=self._state,
90192
) as span:
91193
# Echo back the user's message so it shows up in the UI. This is not done by default so the agent developer has full control over what is shown to the user.
92194
await adk.messages.create(
@@ -102,7 +204,15 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
102204
trace_id=params.task.id,
103205
content=TextContent(
104206
author="agent",
105-
content="Hey, sorry I'm unable to respond to your message because you're running this example without an OpenAI API key. Please set the OPENAI_API_KEY environment variable to run this example. Do this by either by adding a .env file to the project/ directory or by setting the environment variable in your terminal.",
207+
content=(
208+
"Hey, sorry I'm unable to respond to your message "
209+
"because you're running this example without an "
210+
"OpenAI API key. Please set the OPENAI_API_KEY "
211+
"environment variable to run this example. Do this "
212+
"by either by adding a .env file to the project/ "
213+
"directory or by setting the environment variable "
214+
"in your terminal."
215+
),
106216
),
107217
parent_span_id=span.id if span else None,
108218
)
@@ -115,22 +225,33 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
115225
input_list=self._state.input_list,
116226
mcp_server_params=MCP_SERVERS,
117227
agent_name="Tool-Enabled Assistant",
118-
agent_instructions="""You are a helpful assistant that can answer questions using various tools.
119-
You have access to sequential thinking and web search capabilities through MCP servers.
120-
Use these tools when appropriate to provide accurate and well-reasoned responses.""",
228+
agent_instructions=(
229+
"You are a helpful assistant that can answer questions "
230+
"using various tools. You have access to sequential "
231+
"thinking and web search capabilities through MCP servers, "
232+
"as well as a calculator tool for performing basic "
233+
"arithmetic operations. Use these tools when appropriate "
234+
"to provide accurate and well-reasoned responses."
235+
),
121236
parent_span_id=span.id if span else None,
122237
model="o4-mini",
123238
model_settings=ModelSettings(
124239
# Include reasoning items in the response (IDs, summaries)
125240
# response_include=["reasoning.encrypted_content"],
126241
# Ask the model to include a short reasoning summary
127242
reasoning=Reasoning(effort="medium", summary="auto"),
128-
)
243+
),
244+
tools=[CALCULATOR_TOOL],
129245
)
130-
self._state.input_list = run_result.final_input_list
246+
if self._state:
247+
# Update the state with the final input list if available
248+
final_list = getattr(run_result, "final_input_list", None)
249+
if final_list is not None:
250+
self._state.input_list = final_list
131251

132252
# Set the span output to the state for the next turn
133-
span.output = self._state
253+
if span and self._state:
254+
span.output = self._state.model_dump()
134255

135256
@workflow.run
136257
@override
@@ -151,5 +272,5 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
151272

152273
await workflow.wait_condition(
153274
lambda: self._complete_task,
154-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
275+
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
155276
)

0 commit comments

Comments
 (0)