Skip to content

Commit a4bcc76

Browse files
author
Charlotte Zhuang
committed
DO NOT MERGE: custom activity examples
1 parent df8429c commit a4bcc76

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
from typing import Any, Dict, List
3+
4+
from temporalio import activity
5+
6+
from agentex.lib.utils.logging import make_logger
7+
8+
logger = make_logger(__name__)
9+
10+
11+
@activity.defn
12+
async def process_special_data(data: Dict[str, Any]) -> Dict[str, Any]:
13+
"""Custom activity for processing special data in the custom task queue."""
14+
logger.info(f"Processing special data: {data}")
15+
16+
# Simulate some processing
17+
await asyncio.sleep(0.1)
18+
19+
return {
20+
"processed": True,
21+
"original_data": data,
22+
"result": f"Processed {len(data)} items"
23+
}
24+
25+
26+
@activity.defn
27+
async def validate_custom_input(input_data: str) -> bool:
28+
"""Custom activity for validating input in the custom task queue."""
29+
logger.info(f"Validating input: {input_data}")
30+
31+
# Simple validation logic
32+
is_valid = len(input_data) > 0 and not input_data.isspace()
33+
34+
return is_valid
35+
36+
37+
@activity.defn
38+
async def transform_message(message: str, transform_type: str = "uppercase") -> str:
39+
"""Custom activity for transforming messages in various ways."""
40+
logger.info(f"Transforming message with type: {transform_type}")
41+
42+
if transform_type == "uppercase":
43+
return message.upper()
44+
elif transform_type == "lowercase":
45+
return message.lower()
46+
elif transform_type == "reverse":
47+
return message[::-1]
48+
else:
49+
return message
50+
51+
52+
def get_custom_activities() -> List[Any]:
53+
"""Return all custom activities for the special task queue."""
54+
return [
55+
process_special_data,
56+
validate_custom_input,
57+
transform_message,
58+
]
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
3+
from agentex.lib.core.temporal.workers.worker import AgentexWorker
4+
from agentex.lib.environment_variables import EnvironmentVariables
5+
from agentex.lib.utils.logging import make_logger
6+
7+
from custom_activities import get_custom_activities
8+
from workflow import At010AgentChatWorkflow
9+
10+
environment_variables = EnvironmentVariables.refresh()
11+
12+
logger = make_logger(__name__)
13+
14+
CUSTOM_TASK_QUEUE_NAME = "custom-activities-queue"
15+
16+
17+
async def main() -> None:
18+
# Create a worker for the custom task queue
19+
worker = AgentexWorker(
20+
task_queue=CUSTOM_TASK_QUEUE_NAME,
21+
)
22+
23+
await worker.run(
24+
activities=get_custom_activities(),
25+
workflow=At010AgentChatWorkflow,
26+
)
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())

examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/workflow.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,51 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
105105
parent_span_id=span.id if span else None,
106106
)
107107

108+
# Example: Use custom activities from the special task queue
109+
user_message = params.event.content.content
110+
111+
# Validate the input using custom activity
112+
is_valid = await workflow.execute_activity(
113+
"validate_custom_input",
114+
user_message,
115+
task_queue="custom-activities-queue",
116+
)
117+
118+
if is_valid:
119+
# Transform the message using custom activity
120+
transformed = await workflow.execute_activity(
121+
"transform_message",
122+
args=[user_message, "uppercase"],
123+
task_queue="custom-activities-queue",
124+
)
125+
126+
# Process some custom data
127+
processed_result = await workflow.execute_activity(
128+
"process_special_data",
129+
{"message": transformed, "turn": self._state.turn_number if self._state else 0},
130+
task_queue="custom-activities-queue",
131+
)
132+
133+
await adk.messages.create(
134+
task_id=params.task.id,
135+
trace_id=params.task.id,
136+
content=TextContent(
137+
author="agent",
138+
content=f"Custom processing result: {processed_result}",
139+
),
140+
parent_span_id=span.id if span else None,
141+
)
142+
else:
143+
await adk.messages.create(
144+
task_id=params.task.id,
145+
trace_id=params.task.id,
146+
content=TextContent(
147+
author="agent",
148+
content="Invalid input received.",
149+
),
150+
parent_span_id=span.id if span else None,
151+
)
152+
108153
# Call an LLM to respond to the user's message
109154
# When send_as_agent_task_message=True, returns a TaskMessage
110155
run_result = await adk.providers.openai.run_agent_streamed_auto_send(

0 commit comments

Comments
 (0)