Skip to content

Commit 9665b81

Browse files
Merge pull request #98 from scaleapi/add-activities-template
feat(templates): add custom temporal activity timeout guidance
2 parents 38f882b + 7b956eb commit 9665b81

File tree

3 files changed

+98
-3
lines changed

3 files changed

+98
-3
lines changed

src/agentex/lib/cli/templates/temporal/README.md.j2

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,30 @@ class MyWorkflow(BaseWorkflow):
262262
```
263263

264264
### Custom Activities
265-
Add custom activities for external operations:
265+
Add custom activities for external operations. **Important**: Always specify appropriate timeouts (recommended: 10 minutes):
266266

267267
```python
268268
# In project/activities.py
269-
@activity.defn
269+
from datetime import timedelta
270+
from temporalio import activity
271+
from temporalio.common import RetryPolicy
272+
273+
@activity.defn(name="call_external_api")
270274
async def call_external_api(data):
271275
# HTTP requests, database operations, etc.
272276
pass
277+
278+
# In your workflow, call it with a timeout:
279+
result = await workflow.execute_activity(
280+
"call_external_api",
281+
data,
282+
start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout
283+
heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat monitoring
284+
retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry policy
285+
)
286+
287+
# Don't forget to register your custom activities in run_worker.py:
288+
# all_activities = get_all_activities() + [your_custom_activity_function]
273289
```
274290

275291
### Integration with External Services
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""
2+
Custom Temporal Activities Template
3+
====================================
4+
This file is for defining custom Temporal activities that can be executed
5+
by your workflow. Activities are used for:
6+
- External API calls
7+
- Database operations
8+
- File I/O operations
9+
- Heavy computations
10+
- Any non-deterministic operations
11+
12+
IMPORTANT: All activities should have appropriate timeouts!
13+
Default recommendation: start_to_close_timeout=timedelta(minutes=10)
14+
"""
15+
16+
from datetime import timedelta
17+
from typing import Any, Dict
18+
19+
from pydantic import BaseModel
20+
from temporalio import activity
21+
from temporalio.common import RetryPolicy
22+
23+
from agentex.lib.utils.logging import make_logger
24+
25+
logger = make_logger(__name__)
26+
27+
28+
# Example activity parameter models
29+
class ExampleActivityParams(BaseModel):
30+
"""Parameters for the example activity"""
31+
data: Dict[str, Any]
32+
task_id: str
33+
34+
35+
# Example custom activity
36+
@activity.defn(name="example_custom_activity")
37+
async def example_custom_activity(params: ExampleActivityParams) -> Dict[str, Any]:
38+
"""
39+
Example custom activity that demonstrates best practices.
40+
41+
When calling this activity from your workflow, use:
42+
```python
43+
result = await workflow.execute_activity(
44+
"example_custom_activity",
45+
ExampleActivityParams(data={"key": "value"}, task_id=task_id),
46+
start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout
47+
heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat every minute
48+
retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry up to 3 times
49+
)
50+
```
51+
"""
52+
logger.info(f"Processing activity for task {params.task_id} with data: {params.data}")
53+
54+
# Your activity logic here
55+
# This could be:
56+
# - API calls
57+
# - Database operations
58+
# - File processing
59+
# - ML model inference
60+
# - etc.
61+
62+
result = {
63+
"status": "success",
64+
"processed_data": params.data,
65+
"task_id": params.task_id
66+
}
67+
68+
return result
69+
70+
71+
# Add more custom activities below as needed
72+
# Remember to:
73+
# 1. Use appropriate timeouts (default: 10 minutes)
74+
# 2. Define clear parameter models with Pydantic
75+
# 3. Handle errors appropriately
76+
# 4. Use logging for debugging
77+
# 5. Keep activities focused on a single responsibility

src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ async def main():
2222
if task_queue_name is None:
2323
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
2424

25+
all_activities = get_all_activities() + [] # add your own activities here
26+
2527
# Create a worker with automatic tracing
2628
worker = AgentexWorker(
2729
task_queue=task_queue_name,
2830
)
2931

3032
await worker.run(
31-
activities=get_all_activities(),
33+
activities=all_activities,
3234
workflow={{ workflow_class }},
3335
)
3436

0 commit comments

Comments
 (0)