Skip to content

Commit 1ab6803

Browse files
committed
feat: Finish client
1 parent 066b2e8 commit 1ab6803

File tree

5 files changed

+132
-81
lines changed

5 files changed

+132
-81
lines changed

examples/no-llm-framework/src/no_llm_framework/client/__main__.py

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,43 @@
11
import asyncio
22
from typing import Literal
3-
from uuid import uuid4
3+
44

55
import asyncclick as click
6-
import httpx
7-
from a2a.client import A2ACardResolver, A2AClient
8-
from a2a.types import (
9-
Message,
10-
MessageSendParams,
11-
Part,
12-
Role,
13-
SendStreamingMessageRequest,
14-
SendStreamingMessageSuccessResponse,
15-
TaskStatusUpdateEvent,
16-
TextPart,
17-
)
6+
import colorama
7+
from no_llm_framework.client.agent import Agent
188

199

2010
@click.command()
2111
@click.option('--host', 'host', default='localhost')
2212
@click.option('--port', 'port', default=9999)
2313
@click.option('--mode', 'mode', default='streaming')
14+
@click.option('--question', 'question', required=True)
2415
async def a_main(
25-
host: str, port: int, mode: Literal['completion', 'streaming']
16+
host: str,
17+
port: int,
18+
mode: Literal['completion', 'streaming'],
19+
question: str,
2620
):
2721
"""Main function to run the A2A Repo Agent client.
2822
2923
Args:
3024
host (str): The host address to run the server on.
3125
port (int): The port number to run the server on.
3226
mode (Literal['completion', 'streaming']): The mode to run the server on.
27+
question (str): The question to ask the Agent.
3328
""" # noqa: E501
34-
async with httpx.AsyncClient() as httpx_client:
35-
card_resolver = A2ACardResolver(httpx_client, f'http://{host}:{port}')
36-
agent_card = await card_resolver.get_agent_card()
37-
agent_card.url = f'http://{host}:{port}'
38-
39-
client = A2AClient(httpx_client, agent_card=agent_card)
40-
41-
message = MessageSendParams(
42-
message=Message(
43-
role=Role.user,
44-
parts=[Part(TextPart(text='What is Google A2A?'))],
45-
messageId=uuid4().hex,
46-
taskId=uuid4().hex,
47-
)
48-
)
49-
50-
if mode == 'completion':
51-
raise NotImplementedError('Completion mode not implemented')
52-
53-
streaming_request = SendStreamingMessageRequest(params=message)
54-
stream_response = client.send_message_streaming(streaming_request)
55-
async for chunk in stream_response:
56-
if isinstance(
57-
chunk.root, SendStreamingMessageSuccessResponse
58-
) and isinstance(chunk.root.result, TaskStatusUpdateEvent):
59-
message = chunk.root.result.status.message
60-
if message:
61-
print(message.parts[0].root.text, end='', flush=True)
29+
agent = Agent(
30+
mode='stream',
31+
token_stream_callback=None,
32+
agent_urls=[f'http://{host}:{port}/'],
33+
)
34+
async for chunk in agent.stream(question):
35+
if chunk.startswith('<Agent name="'):
36+
print(colorama.Fore.CYAN + chunk, end='', flush=True)
37+
elif chunk.startswith('</Agent>'):
38+
print(colorama.Fore.RESET + chunk, end='', flush=True)
39+
else:
40+
print(chunk, end='', flush=True)
6241

6342

6443
def main() -> None:

examples/no-llm-framework/src/no_llm_framework/client/agent.py

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ def __init__(
6868
self.agents_registry: dict[str, AgentCard] = {}
6969

7070
async def get_agents(self) -> tuple[dict[str, AgentCard], str]:
71+
"""Retrieve agent cards from all agent URLs and render the agent prompt.
72+
73+
Returns:
74+
tuple[dict[str, AgentCard], str]: A dictionary mapping agent names to AgentCard objects, and the rendered agent prompt string.
75+
""" # noqa: E501
7176
async with httpx.AsyncClient() as httpx_client:
7277
card_resolvers = [
7378
A2ACardResolver(httpx_client, url) for url in self.agent_urls
@@ -85,6 +90,14 @@ async def get_agents(self) -> tuple[dict[str, AgentCard], str]:
8590
return agents_registry, agent_prompt
8691

8792
def call_llm(self, prompt: str) -> str:
93+
"""Call the LLM with the given prompt and return the response as a string or generator.
94+
95+
Args:
96+
prompt (str): The prompt to send to the LLM.
97+
98+
Returns:
99+
str or Generator[str]: The LLM response as a string or generator, depending on mode.
100+
""" # noqa: E501
88101
if self.mode == 'complete':
89102
return stream_llm(prompt)
90103

@@ -94,10 +107,31 @@ def call_llm(self, prompt: str) -> str:
94107
return result
95108

96109
async def decide(
97-
self, question: str, agents_prompt: str
110+
self,
111+
question: str,
112+
agents_prompt: str,
113+
called_agents: list[dict] | None = None,
98114
) -> Generator[str, None]:
115+
"""Decide which agent(s) to use to answer the question.
116+
117+
Args:
118+
question (str): The question to answer.
119+
agents_prompt (str): The prompt describing available agents.
120+
called_agents (list[dict] | None): Previously called agents and their answers.
121+
122+
Returns:
123+
Generator[str, None]: The LLM's response as a generator of strings.
124+
""" # noqa: E501
125+
if called_agents:
126+
call_agent_prompt = agent_answer_template.render(
127+
called_agents=called_agents
128+
)
129+
else:
130+
call_agent_prompt = ''
99131
prompt = decide_template.render(
100-
question=question, agent_prompt=agents_prompt
132+
question=question,
133+
agent_prompt=agents_prompt,
134+
call_agent_prompt=call_agent_prompt,
101135
)
102136
return self.call_llm(prompt)
103137

@@ -116,6 +150,15 @@ def extract_agents(self, response: str) -> list[dict]:
116150
async def send_message_to_an_agent(
117151
self, agent_card: AgentCard, message: str
118152
):
153+
"""Send a message to a specific agent and yield the streaming response.
154+
155+
Args:
156+
agent_card (AgentCard): The agent to send the message to.
157+
message (str): The message to send.
158+
159+
Yields:
160+
str: The streaming response from the agent.
161+
"""
119162
async with httpx.AsyncClient() as httpx_client:
120163
client = A2AClient(httpx_client, agent_card=agent_card)
121164
message = MessageSendParams(
@@ -137,65 +180,73 @@ async def send_message_to_an_agent(
137180
yield message.parts[0].root.text
138181

139182
async def stream(self, question: str):
140-
agents_registry, agent_prompt = await self.get_agents()
141-
response = ''
142-
for chunk in await self.decide(question, agent_prompt):
143-
response += chunk
144-
if self.token_stream_callback:
145-
self.token_stream_callback(chunk)
146-
yield chunk
147-
148-
agents = self.extract_agents(response)
183+
"""Stream the process of answering a question, possibly involving multiple agents.
184+
185+
Args:
186+
question (str): The question to answer.
187+
188+
Yields:
189+
str: Streaming output, including agent responses and intermediate steps.
190+
""" # noqa: E501
149191
agent_answers: list[dict] = []
150-
for agent in agents:
151-
agent_response = ''
152-
agent_card = agents_registry[agent['name']]
153-
yield f'<Agent name="{agent["name"]}">\n'
154-
async for chunk in self.send_message_to_an_agent(
155-
agent_card, agent['prompt']
192+
for _ in range(3):
193+
agents_registry, agent_prompt = await self.get_agents()
194+
response = ''
195+
for chunk in await self.decide(
196+
question, agent_prompt, agent_answers
156197
):
157-
agent_response += chunk
198+
response += chunk
158199
if self.token_stream_callback:
159200
self.token_stream_callback(chunk)
160201
yield chunk
161-
yield '</Agent>\n'
162-
match = re.search(
163-
r'<Answer>(.*?)</Answer>', agent_response, re.DOTALL
164-
)
165-
answer = match.group(1).strip() if match else agent_response
166-
if answer:
167-
agent_answers.append(
168-
{
169-
'name': agent['name'],
170-
'prompt': agent['prompt'],
171-
'answer': answer,
172-
}
173-
)
202+
203+
agents = self.extract_agents(response)
204+
if agents:
205+
for agent in agents:
206+
agent_response = ''
207+
agent_card = agents_registry[agent['name']]
208+
yield f'<Agent name="{agent["name"]}">\n'
209+
async for chunk in self.send_message_to_an_agent(
210+
agent_card, agent['prompt']
211+
):
212+
agent_response += chunk
213+
if self.token_stream_callback:
214+
self.token_stream_callback(chunk)
215+
yield chunk
216+
yield '</Agent>\n'
217+
match = re.search(
218+
r'<Answer>(.*?)</Answer>', agent_response, re.DOTALL
219+
)
220+
answer = match.group(1).strip() if match else agent_response
221+
agent_answers.append(
222+
{
223+
'name': agent['name'],
224+
'prompt': agent['prompt'],
225+
'answer': answer,
226+
}
227+
)
174228
else:
175-
print('<Answer> tag not found')
176-
print(agent_answers)
229+
return
177230

178231

179232
if __name__ == '__main__':
180233
import asyncio
181234
import colorama
182235

183236
async def main():
237+
"""Main function to run the A2A Repo Agent client."""
184238
agent = Agent(
185239
mode='stream',
186240
token_stream_callback=None,
187241
agent_urls=['http://localhost:9999/'],
188242
)
189-
agents_registry, agent_prompt = await agent.get_agents()
190-
# agent_card = agents_registry['A2A Protocol Agent']
243+
191244
async for chunk in agent.stream('What is A2A protocol?'):
192245
if chunk.startswith('<Agent name="'):
193246
print(colorama.Fore.CYAN + chunk, end='', flush=True)
194247
elif chunk.startswith('</Agent>'):
195248
print(colorama.Fore.RESET + chunk, end='', flush=True)
196249
else:
197250
print(chunk, end='', flush=True)
198-
# async for chunk in stream_response:
199-
# print(chunk, end='', flush=True)
200251

201252
asyncio.run(main())
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Previous agents have been called. {% for agent in called_agents %}
2+
- Agent: {{ agent.name }}
3+
- Prompt: {{ agent.prompt }}
4+
- Answer: {{ agent.answer }}
5+
{% endfor %}

examples/no-llm-framework/src/no_llm_framework/client/decide.jinja

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ The question is:
33

44
{{ question }}
55

6-
{{ called_agents }}
6+
{{ call_agent_prompt }}
77

88
{{ agent_prompt }}
99

examples/no-llm-framework/src/no_llm_framework/server/agent.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,18 @@ def __init__(
5353
self.mcp_url = mcp_url
5454

5555
def call_llm(self, prompt: str) -> Generator[str, None]:
56+
"""Call the LLM with the given prompt and return a generator of responses.
57+
58+
Args:
59+
prompt (str): The prompt to send to the LLM.
60+
61+
Returns:
62+
Generator[str, None]: A generator yielding the LLM's response.
63+
""" # noqa: E501
5664
return stream_llm(prompt)
5765

5866
async def decide(
59-
self, question: str, called_tools: list[dict]
67+
self, question: str, called_tools: list[dict] | None = None
6068
) -> Generator[str, None]:
6169
"""Decide which tool to use to answer the question.
6270
@@ -107,6 +115,14 @@ async def call_tool(self, tools: list[dict]) -> list[CallToolResult]:
107115
)
108116

109117
async def stream(self, question: str) -> AsyncGenerator[str]:
118+
"""Stream the process of answering a question, possibly involving tool calls.
119+
120+
Args:
121+
question (str): The question to answer.
122+
123+
Yields:
124+
dict: Streaming output, including intermediate steps and final result.
125+
""" # noqa: E501
110126
called_tools = []
111127
for i in range(10):
112128
yield {

0 commit comments

Comments
 (0)