Skip to content

Commit 47e07db

Browse files
authored
Merge pull request #4 from cyberholics/a2a-customer-routing
new changes from adk and a2a new versions
2 parents 02c1ad8 + 63d64a1 commit 47e07db

File tree

5 files changed

+142
-163
lines changed

5 files changed

+142
-163
lines changed

a2a_customer_routing/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ Response: "I understand your frustration. A human agent will contact you shortly
143143
### Core Classes
144144

145145
- **`KB`**: Knowledge base management with LlamaIndex integration
146-
- **`ADKAgentExecutor`**: Wrapper for Google ADK agents to work with A2A protocol
147146
- **`A2AToolClient`**: Client for agent-to-agent communication
148147

149148
### Agent Tools
Lines changed: 55 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
import uuid
22
import logging
33
import os
4-
5-
from a2a.server.agent_execution import AgentExecutor, RequestContext
4+
import asyncio
5+
from typing import Any
6+
import httpx
67
from a2a.server.apps import A2AStarletteApplication
7-
from a2a.server.events import EventQueue
88
from a2a.server.request_handlers import DefaultRequestHandler
9-
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
9+
from a2a.server.tasks import InMemoryTaskStore
1010
from a2a.types import (
11-
AgentCapabilities, AgentCard, AgentSkill, MessageSendParams, Part,
12-
TaskState, TextPart, SendMessageRequest, Message
11+
AgentCapabilities, AgentCard, AgentSkill, Task, TransportProtocol
1312
)
14-
from a2a.utils import new_agent_text_message, new_task
15-
from a2a.client import A2AClient
13+
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
14+
from a2a.client import ClientConfig, ClientFactory, create_text_message_object
15+
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor, A2aAgentExecutorConfig
1616

1717
from google.adk.agents import LlmAgent
1818
from google.adk.runners import Runner
1919
from google.adk.sessions import InMemorySessionService
2020
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
2121
from google.adk.artifacts import InMemoryArtifactService
2222
from google.adk.models.lite_llm import LiteLlm
23-
from google.genai import types
24-
import httpx
25-
2623

2724
from .tools import resolve_query_fn, classify_fn, escalate_fn
2825

@@ -32,7 +29,6 @@
3229

3330
# --- Agent and Model Definitions ---
3431
def create_llm_model(model_name: str):
35-
"""Factory function to create LLM models with consistent configuration."""
3632
api_key = os.getenv("NEBIUS_API_KEY")
3733
return LiteLlm(model=model_name, api_key=api_key, temperature=0.1)
3834

@@ -45,110 +41,65 @@ def create_llm_model(model_name: str):
4541

4642

4743
# --- A2A Server Infrastructure ---
48-
class ADKAgentExecutor(AgentExecutor):
49-
def __init__(self, agent, status_message="Processing...", artifact_name="response"):
50-
self.agent = agent
51-
self.status_message = status_message
52-
self.artifact_name = artifact_name
53-
self.runner = Runner(
54-
app_name=agent.name, agent=agent, artifact_service=InMemoryArtifactService(),
55-
session_service=InMemorySessionService(), memory_service=InMemoryMemoryService()
56-
)
57-
58-
async def cancel(self, task_id: str) -> None: pass
59-
60-
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
61-
query = context.get_user_input()
62-
task = context.current_task or new_task(context.message)
63-
await event_queue.enqueue_event(task)
64-
updater = TaskUpdater(event_queue, task.id, task.contextId)
65-
try:
66-
await updater.update_status(TaskState.working, new_agent_text_message(self.status_message, task.contextId, task.id))
67-
session = await self.runner.session_service.create_session(app_name=self.agent.name, user_id="a2a_user", session_id=task.contextId)
68-
next_message = types.Content(role='user', parts=[types.Part.from_text(text=query)])
69-
final_answer = "The agent could not produce a final answer."
70-
max_turns = 5
71-
for i in range(max_turns):
72-
logger.info(f"Agent '{self.agent.name}' Turn {i+1}/{max_turns}")
73-
response_content = None
74-
async for event in self.runner.run_async(user_id="a2a_user", session_id=session.id, new_message=next_message):
75-
if event.is_final_response() and event.content:
76-
response_content = event.content
77-
break
78-
if not response_content or not response_content.parts:
79-
final_answer = "Agent produced an empty response."; break
80-
part = response_content.parts[0]
81-
if part.function_response:
82-
tool_output = f"Tool '{part.function_response.name}' returned: {part.function_response.response}"
83-
logger.info(f"Agent '{self.agent.name}' Tool call result: {tool_output[:200]}...")
84-
next_message = types.Content(role='user', parts=[types.Part.from_text(text=tool_output)])
85-
continue
86-
elif part.text is not None:
87-
final_answer = part.text.strip()
88-
logger.info(f"Agent '{self.agent.name}' final answer received."); break
89-
else:
90-
final_answer = "Agent produced an unexpected response type."; break
91-
await updater.add_artifact([Part(root=TextPart(text=final_answer))], name=self.artifact_name)
92-
await updater.complete()
93-
except Exception as e:
94-
logger.error(f"Error in ADKAgentExecutor for agent {self.agent.name}: {e}", exc_info=True)
95-
await updater.update_status(TaskState.failed, new_agent_text_message(f"Error: {e!s}", task.contextId, task.id), final=True)
96-
97-
# --- A2A Agent Server Creation ---
98-
def create_agent_a2a_server(agent, name, description, skills, host, port, status_message, artifact_name):
99-
agent_card = AgentCard(
100-
name=name, description=description, url=f"http://{host}:{port}", version="1.0.0",
101-
defaultInputModes=["text"], defaultOutputModes=["text"],
102-
capabilities=AgentCapabilities(streaming=True), skills=skills
44+
def create_agent_a2a_server(agent: LlmAgent, agent_card: AgentCard):
45+
runner = Runner(
46+
app_name=agent.name, agent=agent, artifact_service=InMemoryArtifactService(),
47+
session_service=InMemorySessionService(), memory_service=InMemoryMemoryService()
10348
)
104-
executor = ADKAgentExecutor(agent=agent, status_message=status_message, artifact_name=artifact_name)
49+
config = A2aAgentExecutorConfig()
50+
executor = A2aAgentExecutor(runner=runner, config=config)
10551
request_handler = DefaultRequestHandler(agent_executor=executor, task_store=InMemoryTaskStore())
10652
return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)
10753

108-
# --- Individual Agent Servers ---
10954
def create_intake_agent_server(host="127.0.0.1", port=10020):
110-
return create_agent_a2a_server(agent=intake_agent, name="Sentiment Agent", description="Analyzes message sentiment.", skills=[AgentSkill(id="classify_sentiment", name="Classify Sentiment", description="Determines message sentiment.", tags=["sentiment"])], host=host, port=port, status_message="Analyzing sentiment...", artifact_name="sentiment_result")
55+
card = AgentCard(name="Sentiment Agent", description="Analyzes message sentiment.", url=f"http://{host}:{port}", version="1.0.0", defaultInputModes=["text"], defaultOutputModes=["text"], capabilities=AgentCapabilities(streaming=True), skills=[AgentSkill(id="classify_sentiment", name="Classify Sentiment", description="Determines message sentiment.", tags=["sentiment"])], preferred_transport=TransportProtocol.jsonrpc)
56+
return create_agent_a2a_server(agent=intake_agent, agent_card=card)
57+
11158
def create_resolution_agent_server(host="127.0.0.1", port=10021):
112-
return create_agent_a2a_server(agent=resolution_agent, name="KB Agent", description="Answers questions using a knowledge base.", skills=[AgentSkill(id="resolve_question", name="Resolve Question", description="Searches KB for answers.", tags=["knowledge", "support"])], host=host, port=port, status_message="Searching knowledge base...", artifact_name="kb_answer")
59+
card = AgentCard(name="KB Agent", description="Answers questions using a knowledge base.", url=f"http://{host}:{port}", version="1.0.0", defaultInputModes=["text"], defaultOutputModes=["text"], capabilities=AgentCapabilities(streaming=True), skills=[AgentSkill(id="resolve_question", name="Resolve Question", description="Searches KB for answers.", tags=["knowledge", "support"])], preferred_transport=TransportProtocol.jsonrpc)
60+
return create_agent_a2a_server(agent=resolution_agent, agent_card=card)
61+
11362
def create_escalation_agent_server(host="127.0.0.1", port=10022):
114-
return create_agent_a2a_server(agent=escalation_agent, name="Escalation Agent", description="Escalates issues to human support.", skills=[AgentSkill(id="escalate_issue", name="Escalate Issue", description="Forwards issues to humans.", tags=["escalation", "human"])], host=host, port=port, status_message="Escalating to human support...", artifact_name="escalation_result")
63+
card = AgentCard(name="Escalation Agent", description="Escalates issues to human support.", url=f"http://{host}:{port}", version="1.0.0", defaultInputModes=["text"], defaultOutputModes=["text"], capabilities=AgentCapabilities(streaming=True), skills=[AgentSkill(id="escalate_issue", name="Escalate Issue", description="Forwards issues to humans.", tags=["escalation", "human"])], preferred_transport=TransportProtocol.jsonrpc)
64+
return create_agent_a2a_server(agent=escalation_agent, agent_card=card)
11565

11666

11767
# --- Coordinator Agent & Client ---
11868
class A2AToolClient:
11969
def __init__(self, default_timeout: float = 120.0):
120-
self._agent_info_cache: dict[str, dict[str, any] | None] = {}
70+
self._agent_info_cache: dict[str, Any | None] = {}
12171
self.default_timeout = default_timeout
122-
def add_remote_agent(self, agent_url: str):
123-
normalized_url = agent_url.rstrip('/')
124-
if normalized_url not in self._agent_info_cache: self._agent_info_cache[normalized_url] = None
72+
12573
async def create_task(self, agent_url: str, message: str) -> str:
12674
timeout_config = httpx.Timeout(self.default_timeout)
12775
async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
128-
agent_card_response = await httpx_client.get(f"{agent_url}/.well-known/agent.json")
76+
agent_card_response = await httpx_client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}")
12977
agent_card_response.raise_for_status()
13078
agent_card = AgentCard(**agent_card_response.json())
131-
client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)
132-
79+
80+
config = ClientConfig(httpx_client=httpx_client)
81+
factory = ClientFactory(config)
82+
client = factory.create(agent_card)
83+
84+
message_obj = create_text_message_object(content=message)
85+
final_response = "Agent did not return a valid response."
86+
87+
async for response in client.send_message(message_obj):
88+
if isinstance(response, tuple) and len(response) > 0:
89+
task: Task = response[0]
90+
if task.artifacts:
91+
try:
92+
text_response = task.artifacts[0].parts[0].root.text
93+
if text_response:
94+
final_response = text_response.strip()
95+
break
96+
except (AttributeError, IndexError):
97+
logger.warning(f"Could not extract text from task artifact for {agent_url}")
98+
final_response = f"Agent at {agent_url} returned an unreadable response."
99+
else:
100+
logger.warning(f"Received an unexpected response format from {agent_url}: {response}")
133101

134-
message_payload = Message(
135-
messageId=f"msg_{uuid.uuid4()}",
136-
role='user',
137-
parts=[TextPart(text=message)]
138-
)
139-
send_params = MessageSendParams(message=message_payload)
140-
request = SendMessageRequest(
141-
id=f"req_{uuid.uuid4()}",
142-
params=send_params
143-
)
144-
145-
response = await client.send_message(request)
146-
response_dict = response.model_dump(mode='json', exclude_none=True)
147-
if 'result' in response_dict and 'artifacts' in response_dict['result']:
148-
for artifact in response_dict['result']['artifacts']:
149-
for part in artifact.get('parts', []):
150-
if 'text' in part and part['text'].strip(): return part['text'].strip()
151-
return "Agent did not return a text artifact."
102+
return final_response
152103

153104
coordinator_a2a_client = A2AToolClient()
154105

@@ -157,20 +108,20 @@ def create_coordinator_agent_with_registered_agents():
157108
name="support_coordinator", model=llama_8b, description="Routes user messages to other agents.",
158109
instruction="""You are an expert support coordinator. Your job is to orchestrate other agents to resolve a user's request.
159110
160-
Follow this exact workflow:
161-
1. **Analyze Sentiment:** Use the `create_task` tool to call the Intake Agent (at http://127.0.0.1:10020) with the user's original message. This tool will return a sentiment classification.
111+
Follow this exact workflow step-by-step:
112+
1. **Analyze Sentiment:** Use the `create_task` tool to call the Intake Agent (at http://127.0.0.1:10020) with the user's original message. The tool will return a sentiment classification.
162113
2. **Route Request:**
163-
* **If the result from the Intake Agent contains the word "negative"**, use `create_task` to call the Escalation Agent (at http://127.0.0.1:10022) with the user's original message.
164-
* Otherwise (for "positive" or "neutral"), use `create_task` to call the Resolution Agent (at http://127.0.0.1:10021) with the user's original message.
165-
3. **Return Final Answer:** Your final answer must be ONLY the text returned by the chosen agent (Resolution or Escalation). Do not add any of your own commentary, summaries, or phrases like "The final answer is:".
114+
* If the result from the Intake Agent contains the word "negative", use the `create_task` tool to call the Escalation Agent (at http://127.0.0.1:10022) with the user's original message.
115+
* Otherwise (for "positive" or "neutral"), use the `create_task` tool to call the Resolution Agent (at http://127.0.0.1:10021) with the user's original message.
116+
3. **Finalize and Respond:** The tool used in the previous step will return the final answer. Your final job is to output that exact text as your own final answer. Do not add any of your own commentary, summaries, or phrases like "The final answer is:". Just return the text you received.
166117
""",
167118
tools=[coordinator_a2a_client.create_task]
168119
)
169120

170-
171121
coordinator_agent = None
172122

173123
def create_coordinator_agent_server(host="127.0.0.1", port=10023):
174124
global coordinator_agent
175125
if coordinator_agent is None: raise ValueError("Coordinator agent not initialized.")
176-
return create_agent_a2a_server(agent=coordinator_agent, name="Support Coordinator", description="Orchestrates customer support.", skills=[AgentSkill(id="coordinate_support", name="Coordinate Support", description="Routes customer message to the right agent.", tags=["routing", "sentiment"])], host=host, port=port, status_message="Coordinating request...", artifact_name="support_response")
126+
card = AgentCard(name="Support Coordinator", description="Orchestrates customer support.", url=f"http://{host}:{port}", version="1.0.0", defaultInputModes=["text"], defaultOutputModes=["text"], capabilities=AgentCapabilities(streaming=True), skills=[AgentSkill(id="coordinate_support", name="Coordinate Support", description="Routes customer message to the right agent.", tags=["routing", "sentiment"])], preferred_transport=TransportProtocol.jsonrpc)
127+
return create_agent_a2a_server(agent=coordinator_agent, agent_card=card)

a2a_customer_routing/multi_agent/run_agents.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import requests
77
from . import agent as agent_module
88

9+
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
10+
911
logging.basicConfig(level=logging.INFO)
1012
logger = logging.getLogger(__name__)
1113

@@ -40,7 +42,8 @@ def wait_for_agents(agent_urls, timeout=45):
4042
for url in agent_urls:
4143
if url in ready_agents: continue
4244
try:
43-
if requests.get(f"{url}/.well-known/agent.json", timeout=1).status_code == 200:
45+
health_check_url = f"{url}{AGENT_CARD_WELL_KNOWN_PATH}"
46+
if requests.get(health_check_url, timeout=1).status_code == 200:
4447
logger.info(f" ✅ Agent at {url} is ready.")
4548
ready_agents.add(url)
4649
except requests.ConnectionError: pass
@@ -54,7 +57,6 @@ def wait_for_agents(agent_urls, timeout=45):
5457
def start_all_agents():
5558
"""Start all support agents and the coordinator. This is the main entry point."""
5659

57-
5860
support_agents_to_start = {
5961
"Intake": (agent_module.create_intake_agent_server, 10020),
6062
"Resolution": (agent_module.create_resolution_agent_server, 10021),
@@ -64,9 +66,6 @@ def start_all_agents():
6466
support_agent_urls = [f"http://127.0.0.1:{port}" for _, port in support_agents_to_start.values()]
6567
wait_for_agents(support_agent_urls)
6668

67-
68-
for url in support_agent_urls:
69-
agent_module.coordinator_a2a_client.add_remote_agent(url)
7069

7170
agent_module.coordinator_agent = agent_module.create_coordinator_agent_with_registered_agents()
7271
threads["Coordinator"] = run_agent_in_background(agent_module.create_coordinator_agent_server, 10023, "Coordinator")

a2a_customer_routing/requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
a2a-sdk==0.2.8
2-
google-adk==1.2.1
1+
a2a-sdk==0.3.0
2+
google-adk==1.11.0
33
google-genai==1.19.0
44
httpx==0.28.1
55
litellm==1.72.2

0 commit comments

Comments
 (0)