Skip to content

Commit 0120bb5

Browse files
committed
Adding custom activities
1 parent c456941 commit 0120bb5

File tree

5 files changed

+461
-27
lines changed

5 files changed

+461
-27
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import asyncio
2+
from typing import List, Any
3+
4+
from pydantic import BaseModel
5+
from temporalio import activity
6+
from agentex.lib.utils.logging import make_logger
7+
from agentex.lib import adk
8+
from agentex.types.text_content import TextContent
9+
10+
logger = make_logger(__name__)
11+
12+
13+
PROCESS_BATCH_EVENTS_ACTIVITY = "process_batch_events"
14+
class ProcessBatchEventsActivityParams(BaseModel):
15+
events: List[Any]
16+
batch_number: int
17+
18+
REPORT_PROGRESS_ACTIVITY = "report_progress"
19+
class ReportProgressActivityParams(BaseModel):
20+
num_batches_processed: int
21+
num_batches_failed: int
22+
num_batches_running: int
23+
24+
25+
class CustomActivities:
26+
def __init__(self):
27+
self._batch_size = 5
28+
29+
30+
@activity.defn(name=PROCESS_BATCH_EVENTS_ACTIVITY)
31+
async def process_batch_events(self, params: ProcessBatchEventsActivityParams) -> bool:
32+
"""
33+
This activity will take a list of events and process them.
34+
35+
This is a simple example that demonstrates how to:
36+
1. Create a custom Temporal activity
37+
2. Accept structured parameters via Pydantic models
38+
3. Process batched data
39+
4. Simulate work with async sleep
40+
5. Return results back to the workflow
41+
42+
In a real-world scenario, you could:
43+
- Make database calls (batch inserts, updates)
44+
- Call external APIs (payment processing, email sending)
45+
- Perform heavy computations (ML model inference, data analysis)
46+
- Generate reports or files
47+
- Any other business logic that benefits from Temporal's reliability
48+
49+
The key benefit is that this activity will automatically:
50+
- Retry on failures (with configurable retry policies)
51+
- Be durable across worker restarts
52+
- Provide observability and metrics
53+
- Handle timeouts and cancellations gracefully
54+
"""
55+
logger.info(f"[Batch {params.batch_number}] 🚀 Starting to process batch of {len(params.events)} events")
56+
57+
# Process each event with some simulated work
58+
for i, event in enumerate(params.events):
59+
logger.info(f"[Batch {params.batch_number}] 📄 Processing event {i+1}/{len(params.events)}: {event}")
60+
61+
# Simulate processing time - in reality this could be:
62+
# - Database operations, API calls, file processing, ML inference, etc.
63+
await asyncio.sleep(2)
64+
65+
logger.info(f"[Batch {params.batch_number}] ✅ Event {i+1} processed successfully")
66+
67+
logger.info(f"[Batch {params.batch_number}] 🎉 Batch processing complete! Processed {len(params.events)} events")
68+
69+
# Return success - in reality you might return processing results, IDs, stats, etc.
70+
return True
71+
72+
@activity.defn(name=REPORT_PROGRESS_ACTIVITY)
73+
async def report_progress(self, params: ReportProgressActivityParams) -> None:
74+
"""
75+
This activity will report progress to an external system.
76+
77+
NORMALLY, this would be a call to an external system to report progress. For example, this could
78+
be a call to an email service to send an update email to the user.
79+
80+
In this example, we'll just log the progress to the console.
81+
"""
82+
logger.info(f"📊 Progress Update - num_batches_processed: {params.num_batches_processed}, num_batches_failed: {params.num_batches_failed}, num_batches_running: {params.num_batches_running}")
83+
84+
await adk.messages.create(
85+
task_id=task_id,
86+
content=TextContent(
87+
author="agent",
88+
content=f"📊 Progress Update - num_batches_processed: {params.num_batches_processed}, num_batches_failed: {params.num_batches_failed}, num_batches_running: {params.num_batches_running}",
89+
),
90+
)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from pydantic import BaseModel
2+
3+
4+
class StateModel(BaseModel):
5+
num_batches_processed: int = 0
6+
num_batches_failed: int = 0
7+
total_events_processed: int = 0
8+
total_events_dropped: int = 0
9+
total_events_enqueued: int = 0
10+
11+
12+
class IncomingEventData(BaseModel):
13+
clear_queue: bool = False
14+
cancel_running_tasks: bool = False
Lines changed: 138 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
import json
1+
import asyncio
2+
from typing import List, Any, override
23

34
from temporalio import workflow
5+
from pydantic import BaseModel
46

57
from agentex.lib import adk
68
from agentex.lib.types.acp import CreateTaskParams, SendEventParams
@@ -10,57 +12,167 @@
1012
from agentex.types.text_content import TextContent
1113
from agentex.lib.environment_variables import EnvironmentVariables
1214

15+
16+
from project.workflow_utils import BatchProcessingUtils
17+
from project.shared_models import StateModel, IncomingEventData
18+
19+
1320
environment_variables = EnvironmentVariables.refresh()
1421

1522
if environment_variables.WORKFLOW_NAME is None:
1623
raise ValueError("Environment variable WORKFLOW_NAME is not set")
1724

18-
if environment_variables.AGENT_NAME is None:
25+
if not environment_variables.AGENT_NAME:
1926
raise ValueError("Environment variable AGENT_NAME is not set")
2027

2128
logger = make_logger(__name__)
2229

30+
31+
WAIT_TIMEOUT = 300
32+
BATCH_SIZE = 5
33+
MAX_QUEUE_DEPTH = 50
34+
35+
2336
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
2437
class At030CustomActivitiesWorkflow(BaseWorkflow):
2538
"""
26-
Minimal async workflow template for AgentEx Temporal agents.
39+
Simple tutorial workflow demonstrating custom activities with concurrent processing.
40+
41+
Key Learning Points:
42+
1. Queue incoming events using Temporal signals
43+
2. Process events in batches when enough arrive
44+
3. Use asyncio.create_task() for concurrent processing
45+
4. Execute custom activities from within workflows
46+
5. Handle workflow completion cleanly
2747
"""
2848
def __init__(self):
2949
super().__init__(display_name=environment_variables.AGENT_NAME)
30-
self._complete_task = False
50+
self._incoming_queue: asyncio.Queue[Any] = asyncio.Queue()
51+
self._processing_tasks: List[asyncio.Task[Any]] = []
52+
self._batch_size = BATCH_SIZE
53+
self._state: StateModel
54+
3155

3256
@workflow.signal(name=SignalName.RECEIVE_EVENT)
57+
@override
3358
async def on_task_event_send(self, params: SendEventParams) -> None:
34-
logger.info(f"Received task message instruction: {params}")
35-
36-
# 2. Echo back the client's message to show it in the UI. This is not done by default so the agent developer has full control over what is shown to the user.
37-
await adk.messages.create(task_id=params.task.id, content=params.event.content)
59+
if params.event.content is None:
60+
return
61+
62+
if params.event.content.type == "text":
63+
if self._incoming_queue.qsize() >= MAX_QUEUE_DEPTH:
64+
logger.warning(f"Queue is at max depth of {MAX_QUEUE_DEPTH}. Dropping event.")
65+
if self._state:
66+
self._state.total_events_dropped += 1
67+
else:
68+
await self._incoming_queue.put(params.event.content)
69+
return
3870

39-
# 3. Send a simple response message.
40-
# In future tutorials, this is where we'll add more sophisticated response logic.
41-
await adk.messages.create(
42-
task_id=params.task.id,
43-
content=TextContent(
44-
author="agent",
45-
content=f"Hello! I've received your message. I can't respond right now, but in future tutorials we'll see how you can get me to intelligently respond to your message.",
46-
),
47-
)
71+
elif params.event.content.type == "data":
72+
received_data = params.event.content.data
73+
try:
74+
received_data = IncomingEventData.model_validate(received_data)
75+
except Exception as e:
76+
logger.error(f"Error parsing received data: {e}. Dropping event.")
77+
return
78+
79+
if received_data.clear_queue:
80+
await BatchProcessingUtils.handle_queue_clear(self._incoming_queue, params.task.id)
81+
82+
elif received_data.cancel_running_tasks:
83+
await BatchProcessingUtils.handle_task_cancellation(self._processing_tasks, params.task.id)
84+
else:
85+
logger.info(f"Received IncomingEventData: {received_data} with no known action.")
86+
else:
87+
logger.info(f"Received event: {params.event.content} with no action.")
88+
4889

4990
@workflow.run
50-
async def on_task_create(self, params: CreateTaskParams) -> str:
91+
@override
92+
async def on_task_create(self, params: CreateTaskParams) -> None:
5193
logger.info(f"Received task create params: {params}")
52-
53-
# 1. Acknowledge that the task has been created.
94+
95+
self._state = StateModel()
5496
await adk.messages.create(
5597
task_id=params.task.id,
5698
content=TextContent(
5799
author="agent",
58-
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
100+
content=f"🚀 Starting batch processing! I'll collect events into batches of {self._batch_size} and process them using custom activities.",
59101
),
60102
)
61103

62-
await workflow.wait_condition(
63-
lambda: self._complete_task,
64-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
65-
)
66-
return "Task completed"
104+
# Simple event processing loop with progress tracking
105+
while True:
106+
# Check for completed tasks and update progress
107+
self._processing_tasks = await BatchProcessingUtils.update_progress(self._processing_tasks, self._state, params.task.id)
108+
109+
# Wait for enough events to form a batch, or timeout
110+
try:
111+
await workflow.wait_condition(
112+
lambda: self._incoming_queue.qsize() >= self._batch_size,
113+
timeout=WAIT_TIMEOUT
114+
)
115+
except asyncio.TimeoutError:
116+
logger.info(f"⏰ Timeout after {WAIT_TIMEOUT} seconds - ending workflow")
117+
break
118+
119+
# We have enough events - start processing them as a batch
120+
data_to_process: List[Any] = []
121+
await BatchProcessingUtils.dequeue_pending_data(self._incoming_queue, data_to_process, self._batch_size)
122+
123+
if data_to_process:
124+
batch_number = len(self._processing_tasks) + 1 # Number this batch based on total started
125+
126+
await adk.messages.create(
127+
task_id=params.task.id,
128+
content=TextContent(
129+
author="agent",
130+
content=f"📦 Starting batch #{batch_number} with {len(data_to_process)} events using asyncio.create_task()",
131+
),
132+
)
133+
134+
# Create concurrent task for this batch - this is the key learning point!
135+
task = asyncio.create_task(
136+
BatchProcessingUtils.process_batch_concurrent(
137+
events=data_to_process,
138+
batch_number=batch_number,
139+
task_id=params.task.id
140+
)
141+
)
142+
self._processing_tasks.append(task)
143+
144+
logger.info(f"📝 Tutorial Note: Created asyncio.create_task() for batch #{batch_number} to run asynchronously")
145+
146+
# Check progress again immediately to show real-time updates
147+
self._processing_tasks = await BatchProcessingUtils.update_progress(self._processing_tasks, self._state, params.task.id)
148+
149+
# Process any remaining events that didn't form a complete batch
150+
if self._incoming_queue.qsize() > 0:
151+
data_to_process: List[Any] = []
152+
await BatchProcessingUtils.dequeue_pending_data(self._incoming_queue, data_to_process, self._incoming_queue.qsize())
153+
154+
await adk.messages.create(
155+
task_id=params.task.id,
156+
content=TextContent(
157+
author="agent",
158+
content=f"🔄 Processing final {len(data_to_process)} events that didn't form a complete batch.",
159+
),
160+
)
161+
162+
# Now, add another batch to process the remaining events
163+
batch_number = len(self._processing_tasks) + 1
164+
task = asyncio.create_task(
165+
BatchProcessingUtils.process_batch_concurrent(
166+
events=data_to_process,
167+
batch_number=batch_number,
168+
task_id=params.task.id
169+
)
170+
)
171+
self._processing_tasks.append(task)
172+
173+
# Wait for all remaining tasks to complete, with real-time progress updates
174+
await BatchProcessingUtils.wait_for_remaining_tasks(self._processing_tasks, self._state, params.task.id)
175+
176+
# Final summary with complete statistics
177+
await BatchProcessingUtils.send_final_summary(self._state, params.task.id)
178+
return

0 commit comments

Comments
 (0)