Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion examples/tutorials/00_sync/010_multiturn/project/acp.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,80 @@
import os
import json
from typing import AsyncGenerator, Union

from agentex.lib import adk
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.types.llm_messages import AssistantMessage, LLMConfig, SystemMessage, UserMessage
from agentex.lib.types.task_message_updates import TaskMessageUpdate
from agentex.types.task_message import TaskMessageContent
from agentex.types.task_message_content import TextContent
from agentex.lib.utils.model_utils import BaseModel
from fastapi.responses import JSONResponse
from fastapi import Request
from agentex.lib.environment_variables import EnvironmentVariables

# Create an ACP server
acp = FastACP.create(
acp_type="sync",
)

agentex_client = create_async_agentex_client()

class StateModel(BaseModel):
system_prompt: str
model: str


@acp.get("/webhook")
@acp.post("/webhook")
async def handle_webhook(request: Request) -> JSONResponse:
agent_id = None
env_vars = EnvironmentVariables.refresh()
if env_vars:
agent_id = env_vars.AGENT_ID
if not agent_id:
# No agent ID found, this agent is not registered with the Agentex Server.
return JSONResponse(content={"message": "This agent is not registered with the Agentex Server."})

task = None
task_name = f"{agent_id}_webhook_test"
try:
task = await adk.tasks.get(task_name=task_name)
except Exception:
# If the task doesn't exist, create it.
task = await agentex_client.agents.rpc(
agent_id=agent_id,
method="task/create",
params={
"name": task_name,
}
)

#print(task)

if not task:
return JSONResponse(content={"message": "Failed to get named task."})

message_data = {
"message": "Incoming webhook request received.",
"request": {
"method": request.method,
"headers": dict(request.headers),
"body": await request.json(),
}
}

# Use TextContent to see the full message in UI
await adk.messages.create(
task_id=task.id,
content=TextContent(
author="agent",
content=json.dumps(message_data, indent=4),
)
)
return JSONResponse(content={"message": "Success"})

# Note: The return of this handler is required to be persisted by the Agentex Server
@acp.on_message_send
async def handle_message_send(
Expand Down
Loading