forked from awslabs/amazon-bedrock-agentcore-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstrands_agents_streaming.py
More file actions
32 lines (24 loc) · 882 Bytes
/
strands_agents_streaming.py
File metadata and controls
32 lines (24 loc) · 882 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import os
os.environ["BYPASS_TOOL_CONSENT"]="true"
from strands import Agent
from strands_tools import calculator
# Initialize our agent without a callback handler
agent = Agent(
tools=[calculator],
callback_handler=None
)
from bedrock_agentcore.runtime import BedrockAgentCoreApp
app = BedrockAgentCoreApp()
@app.entrypoint
async def agent_invocation(payload, context):
"""Handler for agent invocation with streaming support"""
user_message = payload.get("prompt", "No prompt found in input, please guide customer to create a json payload with prompt key")
print("context:\n-------\n", context)
print("processing message:\n*******\n", user_message)
# Get the agent stream
agent_stream = agent.stream_async(user_message)
async for event in agent_stream:
yield event
if __name__ == "__main__":
app.run()