Replies: 1 comment 1 reply
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: To receive a streaming response, you need to set Hello! I can help with your question about enabling streaming responses from your Your current 1. Modify the Request PayloadIn your request_payload = {
"message": {
# ... your message properties
},
"metadata": {
# ... your metadata properties
},
"configuration" : {"blocking" : False} # Change this to false
}2. Update Client to Handle SSE StreamYour current code uses Here is a revised version of your import json
import httpx
# ... (keep your AGENT_MAP, get_google_auth_token, etc.)
async def delegate_agent(
agent_name:str,
user_request:str,
tool_context : ToolContext
):
if agent_name not in AGENT_MAP:
raise ValueError(f"Unknown agent: {agent_name}")
agent_url = AGENT_MAP[agent_name]
run_config = tool_context.run_config
custom_metadata = run_config.custom_metadata
session_state_key = f"{agent_name}_session_id"
# ... (your metadata extraction logic)
request_payload = {
"message":{
"messageId": str(uuid.uuid4()),
"role": "ROLE_USER",
"content": [{"text": user_request}]
},
"metadata":{
# ... (your metadata properties)
},
"configuration" : {"blocking" : False} # Enable streaming
}
try:
auth_token = get_google_auth_token()
full_response_text = []
async with httpx.AsyncClient(timeout=600) as client:
async with client.stream(
"POST",
f"{agent_url}/message:send",
json=request_payload,
headers={
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
"Accept": "text/event-stream" # Indicate we accept SSE
}
) as response:
response.raise_for_status()
print(f"Connection opened with {agent_name}")
async for line in response.aiter_lines():
if line.startswith("data:"):
data_str = line[len("data:"):].strip()
if not data_str:
continue
try:
event = json.loads(data_str)
# Example of extracting session_id from a stream artifact
new_session_id = _extract_session_id_from_event(event)
if new_session_id:
tool_context.state[session_state_key] = new_session_id
# Example of parsing text from the stream
text_chunk = _parse_text_from_event(event)
if text_chunk:
# You can yield or process each chunk here
print(text_chunk, end="", flush=True)
full_response_text.append(text_chunk)
except json.JSONDecodeError:
print(f"\n[Warning: Could not decode JSON from stream: {data_str}]")
return f"Final response from {agent_name}:\n{' '.join(full_response_text)}"
except httpx.HTTPError as e:
raise UnsupportedOperationError(f"Error communicating with agent {agent_name}: {str(e)}")
def _extract_session_id_from_event(event: dict) -> str | None:
# This is an example; the actual structure may vary.
# You need to inspect the event structure to find the session ID.
artifacts = event.get('task', {}).get('artifacts', [])
for artifact in artifacts:
if artifact.get("name") == "vertex_session_id":
parts = artifact.get("parts", [])
if parts and "text" in parts[0]:
return parts[0]["text"].strip()
return None
def _parse_text_from_event(event: dict) -> str | None:
# This is an example for parsing text from a streaming event.
# The actual structure depends on the agent's output.
# You'll need to inspect the events to build the correct parsing logic.
artifacts = event.get('task', {}).get('artifacts', [])
for artifact in artifacts:
# Skip session ID artifacts
if artifact.get("name") == 'vertex_session_id':
continue
parts = artifact.get("parts", [])
for part in parts:
if "text" in part:
return part['text']
return NoneSummary of Changes:
For more details on how streaming is handled within ADK, you can refer to the documentation on [3] https://google.github.io/adk-docs/runtime/runconfig.html |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
How to send a request to AdkApp deployed in the Vertex AI Agent Engine to receive a streaming response
How to do postman or http json SSE communication in that code, what additional code do you need before distributing it
Beta Was this translation helpful? Give feedback.
All reactions