-
Notifications
You must be signed in to change notification settings - Fork 322
Description
Problem Description
In the current implementation, there is an issue with the await self.task_manager.on_send_task_subscribe(json_rpc_request) call in server.py. The problem occurs because:
-
The method
on_send_task_subscribe()is declared to returnUnion[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse] -
When it returns an
AsyncIterable(asynchronous generator), we cannot directly use await on it -
This causes the error:
TypeError: object async_generator can't be used in 'await' expression
Expected Behavior
For streaming requests (SendTaskStreamingRequest), the system should properly handle the asynchronous streaming response without raising exceptions.
Current Behavior
The code attempts to await an asynchronous generator, which is not a valid operation in Python, resulting in the mentioned error.
Solution Options
Option 1: Change the calling code (Recommended)
elif isinstance(json_rpc_request, SendTaskStreamingRequest):
# Don't await the async generator, pass it directly
result = self.task_manager.on_send_task_subscribe(json_rpc_request)Option 2: Modify the method implementation (If keeping await is necessary)
Change on_send_task_subscribe() to return a coroutine that collects all streaming responses:
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> Union[List[SendTaskStreamingResponse], JSONRPCResponse]:
responses = []
async for item in original_async_gen:
responses.append(item)
return responsesRecommendation
Option 1 is preferred as it maintains the streaming nature of the response. Option 2 would convert the streaming response into a batch response, which may not be desirable for the intended use case.