-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Description
When I tried non-streaming output, the code worked fine. but I tried to stream output, the code reported an error
## examples/agent_patterns/routing.py
import asyncio
import uuid
from openai.types.responses import ResponseContentPartDoneEvent, ResponseTextDeltaEvent
from agents import Agent, RawResponsesStreamEvent, Runner, TResponseInputItem, trace
"""
This example shows the handoffs/routing pattern. The triage agent receives the first message, and
then hands off to the appropriate agent based on the language of the request. Responses are
streamed to the user.
"""
french_agent = Agent(
name="french_agent",
instructions="You only speak French",
model=OpenAIChatCompletionsModel(
model="qwen-max",
openai_client=external_client,
),
model_settings=ModelSettings(temperature=0.6),
)
spanish_agent = Agent(
name="spanish_agent",
instructions="You only speak Spanish",
model=OpenAIChatCompletionsModel(
model="qwen-max",
openai_client=external_client,
),
model_settings=ModelSettings(temperature=0.6),
)
english_agent = Agent(
name="english_agent",
instructions="You only speak English",
model=OpenAIChatCompletionsModel(
model="qwen-max",
openai_client=external_client,
),
model_settings=ModelSettings(temperature=0.6),
)
triage_agent = Agent(
name="triage_agent",
instructions="Handoff to the appropriate agent based on the language of the request.",
handoffs=[french_agent, spanish_agent, english_agent],
model=OpenAIChatCompletionsModel(
model="qwen-max",
openai_client=external_client,
),
model_settings=ModelSettings(temperature=0.6),
)
async def main():
# We'll create an ID for this conversation, so we can link each trace
conversation_id = str(uuid.uuid4().hex[:16])
msg = input("Hi! We speak French, Spanish and English. How can I help? ")
agent = triage_agent
inputs: list[TResponseInputItem] = [{"content": msg, "role": "user"}]
while True:
# Each conversation turn is a single trace. Normally, each input from the user would be an
# API request to your app, and you can wrap the request in a trace()
with trace("Routing example", group_id=conversation_id):
result = Runner.run_streamed(
agent,
input=inputs,
)
async for event in result.stream_events():
if not isinstance(event, RawResponsesStreamEvent):
continue
data = event.data
if isinstance(data, ResponseTextDeltaEvent):
print(data.delta, end="", flush=True)
elif isinstance(data, ResponseContentPartDoneEvent):
print("\n")
inputs = result.to_input_list()
print("\n")
user_msg = input("Enter a message: ")
inputs.append({"content": user_msg, "role": "user"})
agent = result.current_agent
if __name__ == "__main__":
asyncio.run(main())
AttributeError Traceback (most recent call last)
Cell In[4], line 86
81 print("\n")
85 if name == "main":
---> 86 asyncio.run(main())
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/nest_asyncio.py:30, in _patch_asyncio..run(main, debug)
28 task = asyncio.ensure_future(main)
29 try:
---> 30 return loop.run_until_complete(task)
31 finally:
32 if not task.done():
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/nest_asyncio.py:98, in _patch_loop..run_until_complete(self, future)
95 if not f.done():
96 raise RuntimeError(
97 'Event loop stopped before Future completed.')
---> 98 return f.result()
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/asyncio/futures.py:203, in Future.result(self)
201 self.__log_traceback = False
202 if self._exception is not None:
--> 203 raise self._exception.with_traceback(self._exception_tb)
204 return self._result
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/asyncio/tasks.py:277, in Task.__step(failed resolving arguments)
273 try:
274 if exc is None:
275 # We use the send
method directly, because coroutines
276 # don't have __iter__
and __next__
methods.
--> 277 result = coro.send(None)
278 else:
279 result = coro.throw(exc)
Cell In[4], line 71
66 with trace("Routing example", group_id=conversation_id):
67 result = Runner.run_streamed(
68 agent,
69 input=inputs,
70 )
---> 71 async for event in result.stream_events():
72 if not isinstance(event, RawResponsesStreamEvent):
73 continue
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/agents/result.py:182, in RunResultStreaming.stream_events(self)
179 self._cleanup_tasks()
181 if self._stored_exception:
--> 182 raise self._stored_exception
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/asyncio/tasks.py:277, in Task.__step(failed resolving arguments)
273 try:
274 if exc is None:
275 # We use the send
method directly, because coroutines
276 # don't have __iter__
and __next__
methods.
--> 277 result = coro.send(None)
278 else:
279 result = coro.throw(exc)
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/agents/run.py:537, in Runner._run_streamed_impl(cls, starting_input, streamed_result, starting_agent, max_turns, hooks, context_wrapper, run_config)
526 streamed_result._input_guardrails_task = asyncio.create_task(
527 cls._run_input_guardrails_with_queue(
528 starting_agent,
(...)
534 )
535 )
536 try:
--> 537 turn_result = await cls._run_single_turn_streamed(
538 streamed_result,
539 current_agent,
540 hooks,
541 context_wrapper,
542 run_config,
543 should_run_agent_start_hooks,
544 )
545 should_run_agent_start_hooks = False
547 streamed_result.raw_responses = streamed_result.raw_responses + [
548 turn_result.model_response
549 ]
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/agents/run.py:654, in Runner._run_single_turn_streamed(cls, streamed_result, agent, hooks, context_wrapper, run_config, should_run_agent_start_hooks)
639 async for event in model.stream_response(
640 system_prompt,
641 input,
(...)
648 ),
649 ):
650 if isinstance(event, ResponseCompletedEvent):
651 usage = (
652 Usage(
653 requests=1,
--> 654 input_tokens=event.response.usage.input_tokens,
655 output_tokens=event.response.usage.output_tokens,
656 total_tokens=event.response.usage.total_tokens,
657 )
658 if event.response.usage
659 else Usage()
660 )
661 final_response = ModelResponse(
662 output=event.response.output,
663 usage=usage,
664 referenceable_id=event.response.id,
665 )
667 streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event))
File ~/Documents/software/anaconda3/envs/agents/lib/python3.11/site-packages/pydantic/main.py:891, in BaseModel.getattr(self, item)
888 return super().getattribute(item) # Raises AttributeError if appropriate
889 else:
890 # this is the current error
--> 891 raise AttributeError(f'{type(self).name!r} object has no attribute {item!r}')
AttributeError: 'CompletionUsage' object has no attribute 'input_tokens'