Skip to content

Commit d64cf65

Browse files
feat: introducing special activities due to temporal async constraints
1 parent 86993cd commit d64cf65

File tree

8 files changed

+210
-87
lines changed

8 files changed

+210
-87
lines changed
Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Simplified OpenAI Agent Chat - Agent Platform Integration
1+
# OpenAI Temporal Integration Tutorial
22

3-
This tutorial demonstrates the new **Agent Platform Integration** for Agentex that dramatically simplifies agent development while preserving all Agentex infrastructure benefits.
3+
This tutorial demonstrates the **Agent Platform Integration** for Agentex that provides a streamlined approach to agent development while maintaining full Agentex infrastructure compatibility.
44

55
## Before vs After Comparison
66

@@ -13,31 +13,30 @@ This tutorial demonstrates the new **Agent Platform Integration** for Agentex th
1313
| **Error handling** | Manual try/catch and retry logic | Built-in recovery |
1414
| **ACP integration** | Manual message creation/sending | Automatic via bridge |
1515

16-
## Key Benefits
16+
## Key Features
1717

18-
### 🚀 **Dramatically Reduced Complexity**
19-
- **90% reduction in code** - from 277 lines to ~30 lines
20-
- **No manual orchestration** - agent execution is automatically durable
21-
- **No activity definitions** - tool calls are automatically temporal activities
18+
### **Reduced Complexity**
19+
- Simplified codebase: from 277 lines to ~30 lines
20+
- Automatic agent execution durability
21+
- Built-in tool call orchestration
2222

23-
### 🔧 **Preserved Agentex Infrastructure**
24-
- **ACP protocol compatibility** - external clients unchanged
25-
- **Kubernetes deployment** - same Helm charts and configs
26-
- **Multi-tenant hosting** - same agent discovery and routing
27-
- **Authentication & monitoring** - same observability stack
23+
### **Infrastructure Compatibility**
24+
- Full ACP protocol compatibility
25+
- Existing deployment configurations work unchanged
26+
- Same authentication and monitoring systems
27+
- Multi-tenant hosting support maintained
2828

29-
### 🎯 **Platform Agnostic Design**
30-
- **OpenAI Agents SDK** - this tutorial (implemented)
31-
- **LangChain** - future extension point
32-
- **CrewAI** - future extension point
33-
- **Custom frameworks** - extensible via strategy pattern
29+
### **Platform Extensibility**
30+
- OpenAI Agents SDK integration (implemented)
31+
- Extensible architecture for LangChain, CrewAI
32+
- Strategy pattern for custom frameworks
3433

3534
## Implementation Details
3635

3736
### Workflow Definition
3837
```python
3938
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
40-
class SimplifiedOpenAIChatAgent(OpenAIAgentWorkflow):
39+
class At040OpenAITemporalIntegration(OpenAIAgentWorkflow):
4140
async def create_agent(self) -> Agent:
4241
return Agent(
4342
name="Tool-Enabled Assistant",
@@ -51,41 +50,41 @@ class SimplifiedOpenAIChatAgent(OpenAIAgentWorkflow):
5150
```python
5251
worker = AgentexWorker(
5352
task_queue=environment_variables.WORKFLOW_TASK_QUEUE,
54-
agent_platform="openai", # Automatic optimization
53+
agent_platform="openai", # Platform optimization
5554
)
56-
await worker.run(activities=[], workflow=SimplifiedOpenAIChatAgent)
55+
await worker.run(activities=[], workflow=At040OpenAITemporalIntegration)
5756
```
5857

59-
## Architecture Benefits
58+
## Technical Architecture
6059

61-
### Automatic Durability
62-
- **Agent executions** become Temporal activities automatically
63-
- **Tool calls** are durable with automatic retries
64-
- **Conversation state** persists across workflow restarts
60+
### Durability Features
61+
- Agent executions are automatically temporal activities
62+
- Tool calls include built-in retry mechanisms
63+
- Conversation state persists across workflow restarts
6564

66-
### Performance Optimizations
67-
- **Activity exclusion** - OpenAI provider activities automatically excluded
68-
- **Direct SDK integration** - bypasses activity overhead for simple cases
69-
- **Platform-specific configuration** - optimized worker settings per platform
65+
### Performance Features
66+
- Automatic exclusion of unused provider activities
67+
- Direct SDK integration reduces overhead
68+
- Platform-specific worker configuration
7069

71-
### Future Extensibility
72-
- **Strategy pattern** - easy to add new agent platforms
73-
- **Unified interface** - same workflow pattern across all platforms
74-
- **Agentex compatibility** - seamless integration with existing infrastructure
70+
### Extensibility
71+
- Strategy pattern for adding new agent platforms
72+
- Consistent workflow interface across platforms
73+
- Full compatibility with existing Agentex infrastructure
7574

7675
## Running the Tutorial
7776

7877
1. **Set environment variables:**
7978
```bash
80-
export WORKFLOW_NAME="simplified-openai-chat"
81-
export WORKFLOW_TASK_QUEUE="simplified_openai_chat_queue"
82-
export AGENT_NAME="simplified-openai-chat"
79+
export WORKFLOW_NAME="at040-openai-temporal-integration"
80+
export WORKFLOW_TASK_QUEUE="040_openai_temporal_integration_queue"
81+
export AGENT_NAME="at040-openai-temporal-integration"
8382
export OPENAI_API_KEY="your-openai-api-key"
8483
```
8584

86-
2. **Start the worker:**
85+
2. **Run the agent:**
8786
```bash
88-
python project/run_worker.py
87+
uv run agentex agents run --manifest manifest.yaml
8988
```
9089

9190
3. **Test via ACP API:**
@@ -95,29 +94,29 @@ await worker.run(activities=[], workflow=SimplifiedOpenAIChatAgent)
9594
-d '{
9695
"method": "task/create",
9796
"params": {
98-
"agent_name": "simplified-openai-chat"
97+
"agent_name": "at040-openai-temporal-integration"
9998
}
10099
}'
101100
```
102101

103-
## Migration Guide
102+
## Migration from Manual Approach
104103

105-
To migrate from the complex manual approach to this simplified approach:
104+
To migrate from the manual orchestration pattern (010_agent_chat):
106105

107-
1. **Replace workflow inheritance:**
108-
- From: `BaseWorkflow`
109-
- To: `OpenAIAgentWorkflow` (or other platform workflow)
106+
1. **Update workflow inheritance:**
107+
- Change from: `BaseWorkflow`
108+
- Change to: `OpenAIAgentWorkflow`
110109

111-
2. **Replace manual orchestration:**
112-
- From: Manual `adk.providers.openai.run_agent_streamed_auto_send()`
113-
- To: Simple `create_agent()` implementation
110+
2. **Replace orchestration code:**
111+
- Remove: Manual `adk.providers.openai.run_agent_streamed_auto_send()` calls
112+
- Add: `create_agent()` method implementation
114113

115114
3. **Update worker configuration:**
116-
- Add: `agent_platform="openai"` parameter
117-
- Remove: Manual activity registration
115+
- Add: `agent_platform="openai"` parameter to `AgentexWorker`
116+
- Activities: Use empty list `[]` for automatic optimization
118117

119-
4. **Remove manual activities:**
120-
- Delete: Custom `@activity.defn` wrappers
121-
- Keep: Core business logic in simple functions
118+
4. **Simplify activity management:**
119+
- Remove: Custom `@activity.defn` wrapper functions
120+
- Retain: Core business logic as regular functions
122121

123-
This approach maintains 100% compatibility with existing Agentex infrastructure while dramatically simplifying development.
122+
This maintains full compatibility with existing Agentex infrastructure.

examples/tutorials/10_agentic/10_temporal/040_openai_temporal_integration/manifest.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ build:
3030
# Only used when running the agent locally
3131
local_development:
3232
agent:
33-
port: 8000 # Port where your local ACP server is running
33+
port: 18000 # Port where your local ACP server is running
3434
host_address: host.docker.internal # Host address for Docker networking
3535

3636
# File paths for local development (relative to this manifest.yaml)

examples/tutorials/10_agentic/10_temporal/040_openai_temporal_integration/project/workflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from agentex.lib.core.temporal.agent_platforms.workflow import OpenAIAgentWorkflow
2121
from agentex.lib.environment_variables import EnvironmentVariables
22+
from agentex.lib.types.acp import CreateTaskParams
2223
from agents import Agent
2324

2425
environment_variables = EnvironmentVariables.refresh()
@@ -36,6 +37,11 @@ class At040OpenAITemporalIntegration(OpenAIAgentWorkflow):
3637
agent configuration approach.
3738
"""
3839

40+
@workflow.run
41+
async def on_task_create(self, params: CreateTaskParams) -> None:
42+
"""Task creation handler - delegates to platform base class"""
43+
await super().on_task_create(params)
44+
3945
async def create_agent(self) -> Agent:
4046
"""
4147
Define the OpenAI agent configuration.

src/agentex/lib/core/temporal/activities/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ def get_all_activities(sgp_client=None, exclude_platforms=None):
212212
openai_activities.run_agent_auto_send,
213213
openai_activities.run_agent_streamed_auto_send,
214214
])
215+
else:
216+
# When excluding OpenAI provider activities, include agent platform activities
217+
from agentex.lib.core.temporal.agent_platforms import (
218+
openai_agent_execution,
219+
langchain_agent_execution,
220+
crewai_agent_execution,
221+
)
222+
activities.extend([
223+
openai_agent_execution,
224+
langchain_agent_execution,
225+
crewai_agent_execution,
226+
])
215227

216228
# SGP activities
217229
if sgp_client is not None:

src/agentex/lib/core/temporal/agent_platforms/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
from .registry import AgentPlatformRegistry
55
from .workflow import AgentPlatformWorkflow
66
from .tools import AgentexToolAdapter, create_openai_tools_from_activities, create_search_tool
7+
from .activities import openai_agent_execution, langchain_agent_execution, crewai_agent_execution
78

89
__all__ = [
910
"ACPAgentBridge",
1011
"AgentExecutionStrategy",
11-
"OpenAIExecutionStrategy",
12+
"OpenAIExecutionStrategy",
1213
"AgentPlatformRegistry",
1314
"AgentPlatformWorkflow",
1415
"AgentexToolAdapter",
1516
"create_openai_tools_from_activities",
1617
"create_search_tool",
18+
"openai_agent_execution",
19+
"langchain_agent_execution",
20+
"crewai_agent_execution",
1721
]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""
2+
Agent platform activities - Temporal activities for platform-specific agent execution.
3+
4+
These activities provide the proper async execution context for different agent
5+
frameworks that may not work directly within Temporal workflow constraints.
6+
"""
7+
8+
from typing import Any, Dict
9+
10+
from temporalio import activity
11+
from agents import Agent, Runner, ModelSettings
12+
13+
from agentex.lib.utils.logging import make_logger
14+
15+
logger = make_logger(__name__)
16+
17+
@activity.defn(name="openai_agent_execution")
18+
async def openai_agent_execution(params: Dict[str, Any]) -> str:
19+
"""
20+
Execute OpenAI agent in proper async activity context.
21+
22+
OpenAI Agents SDK requires full async context including executor access
23+
which isn't available in Temporal workflow execution environment.
24+
25+
Args:
26+
params: Dictionary with agent_config, user_input, task_id
27+
28+
Returns:
29+
str: Agent's response output
30+
"""
31+
agent_config = params["agent_config"]
32+
user_input = params["user_input"]
33+
task_id = params["task_id"]
34+
35+
try:
36+
# Handle serialized config - convert dicts back to proper types
37+
cleaned_config = {}
38+
for key, value in agent_config.items():
39+
if key == "model_settings" and isinstance(value, dict):
40+
# Convert dict back to ModelSettings instance
41+
cleaned_config[key] = ModelSettings(**value)
42+
else:
43+
cleaned_config[key] = value
44+
45+
# Create agent from cleaned config
46+
agent = Agent(**cleaned_config)
47+
48+
# Execute agent with user input in proper async context
49+
logger.debug(f"Executing OpenAI agent for task {task_id} with input: {user_input[:100]}...")
50+
result = await Runner.run(starting_agent=agent, input=user_input)
51+
52+
# Extract final output
53+
output = result.final_output if hasattr(result, 'final_output') else str(result)
54+
55+
logger.debug(f"OpenAI agent execution completed for task {task_id}")
56+
return output
57+
58+
except Exception as e:
59+
logger.error(f"OpenAI agent execution failed for task {task_id}: {e}")
60+
raise
61+
62+
63+
@activity.defn(name="langchain_agent_execution")
64+
async def langchain_agent_execution(params: Dict[str, Any]) -> str:
65+
"""
66+
Execute LangChain agent in proper async activity context (future implementation).
67+
"""
68+
# TODO: Implement when LangChain strategy is added
69+
raise NotImplementedError("LangChain agent execution not yet implemented")
70+
71+
72+
@activity.defn(name="crewai_agent_execution")
73+
async def crewai_agent_execution(params: Dict[str, Any]) -> str:
74+
"""
75+
Execute CrewAI agent in proper async activity context (future implementation).
76+
"""
77+
# TODO: Implement when CrewAI strategy is added
78+
raise NotImplementedError("CrewAI agent execution not yet implemented")

src/agentex/lib/core/temporal/agent_platforms/strategies.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ class OpenAIExecutionStrategy(AgentExecutionStrategy):
7474
"""OpenAI Agents SDK execution strategy"""
7575

7676
def __init__(self):
77-
from agentex.lib.core.services.adk.providers.openai import OpenAIService
78-
self.openai_service = OpenAIService()
77+
# No dependencies needed - we use OpenAI Agents SDK directly
78+
pass
7979

8080
@property
8181
def platform_name(self) -> str:
@@ -89,43 +89,34 @@ async def execute_agent(
8989
trace_id: str = None
9090
) -> str:
9191
"""
92-
Execute OpenAI agent using direct Agent SDK integration.
92+
Execute OpenAI agent using Temporal activity for proper async context.
9393
94-
This bypasses the existing run_agent_streamed_auto_send activity to use
95-
the OpenAI Agents SDK directly for better performance and durability.
94+
OpenAI Agents SDK requires full async context which isn't available in
95+
Temporal workflow execution. We need to run it as an activity.
9696
"""
9797
try:
98-
# Import OpenAI Agents SDK
99-
from agents import Agent, Runner
100-
101-
# Create agent from config
102-
agent = Agent(**agent_config)
98+
from temporalio import workflow
10399

104-
# Execute agent with user input
105-
logger.debug(f"Executing OpenAI agent for task {task_id} with input: {user_input[:100]}...")
106-
result = await Runner.run(starting_agent=agent, input=user_input)
100+
# Execute OpenAI agent as a Temporal activity for proper async context
101+
logger.debug(f"Executing OpenAI agent activity for task {task_id} with input: {user_input[:100]}...")
107102

108-
# Extract final output
109-
output = result.final_output if hasattr(result, 'final_output') else str(result)
103+
result = await workflow.execute_activity(
104+
activity="openai_agent_execution",
105+
arg={"agent_config": agent_config, "user_input": user_input, "task_id": task_id},
106+
start_to_close_timeout=workflow.timedelta(minutes=5),
107+
)
110108

111109
logger.debug(f"OpenAI agent execution completed for task {task_id}")
112-
return output
110+
return result
113111

114-
except ImportError as e:
115-
logger.error(f"OpenAI Agents SDK not available: {e}")
116-
raise RuntimeError("OpenAI Agents SDK is required for OpenAI strategy") from e
117112
except Exception as e:
118113
logger.error(f"OpenAI agent execution failed for task {task_id}: {e}")
119114
raise
120115

121116
async def create_agent_from_config(self, config: Dict[str, Any]) -> Any:
122117
"""Create OpenAI Agent instance from configuration"""
123-
try:
124-
from agents import Agent
125-
return Agent(**config)
126-
except ImportError as e:
127-
logger.error(f"OpenAI Agents SDK not available: {e}")
128-
raise RuntimeError("OpenAI Agents SDK is required for OpenAI strategy") from e
118+
from agents import Agent
119+
return Agent(**config)
129120

130121
def get_worker_config(self, platform_config: Dict[str, Any]) -> Dict[str, Any]:
131122
"""

0 commit comments

Comments
 (0)