Skip to content

Commit 1560e4d

Browse files
authored
Merge pull request #7 from brnaba-aws/feat/streaming
added streaming
2 parents beca45f + feb0682 commit 1560e4d

File tree

3 files changed

+174
-33
lines changed

3 files changed

+174
-33
lines changed

agent/strands_agent.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,26 @@ def weather():
2121
agent = Agent(
2222
model=model,
2323
tools=[calculator, weather],
24-
system_prompt="You're a helpful assistant. You can do simple math calculation, and tell the weather."
24+
system_prompt="You're a helpful assistant. You can do simple math calculation, and tell the weather.",
25+
callback_handler=None
2526
)
2627

2728
@app.entrypoint
28-
def strands_agent_bedrock(payload):
29+
async def agent_invocation(payload):
2930
"""
3031
Invoke the agent with a payload
31-
32+
3233
IMPORTANT: Payload structure varies depending on invocation method:
3334
- Direct invocation (Python SDK, Console, agentcore CLI): {"prompt": "..."}
3435
- AWS SDK invocation (JS/Java/etc via InvokeAgentRuntimeCommand): {"input": {"prompt": "..."}}
35-
36+
3637
The AWS SDK automatically wraps payloads in an "input" field as part of the API contract.
3738
This function handles both formats for maximum compatibility.
3839
"""
3940
# Handle both dict and string payloads
4041
if isinstance(payload, str):
4142
payload = json.loads(payload)
42-
43+
4344
# Extract the prompt from the payload
4445
# Try AWS SDK format first (most common for production): {"input": {"prompt": "..."}}
4546
# Fall back to direct format: {"prompt": "..."}
@@ -49,14 +50,19 @@ def strands_agent_bedrock(payload):
4950
user_input = payload["input"].get("prompt")
5051
else:
5152
user_input = payload.get("prompt")
52-
53+
5354
if not user_input:
5455
raise ValueError(f"No prompt found in payload. Expected {{'prompt': '...'}} or {{'input': {{'prompt': '...'}}}}. Received: {payload}")
55-
56-
response = agent(user_input)
57-
response_text = response.message['content'][0]['text']
58-
59-
return response_text
56+
57+
# response = agent(user_input)
58+
# response_text = response.message['content'][0]['text']
59+
stream = agent.stream_async(user_input)
60+
async for event in stream:
61+
if (event.get('event',{}).get('contentBlockDelta',{}).get('delta',{}).get('text')):
62+
print(event.get('event',{}).get('contentBlockDelta',{}).get('delta',{}).get('text'))
63+
yield (event.get('event',{}).get('contentBlockDelta',{}).get('delta',{}).get('text'))
64+
65+
# return response_text
6066

6167
if __name__ == "__main__":
6268
app.run()

frontend/src/App.tsx

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,54 @@ function App() {
191191
const currentPrompt = prompt;
192192
setPrompt('');
193193

194+
// Add a placeholder message for the streaming response
195+
const streamingMessageIndex = messages.length + 1;
196+
setMessages(prev => [...prev, {
197+
type: 'agent',
198+
content: '',
199+
timestamp: new Date()
200+
}]);
201+
194202
try {
195-
const data = await invokeAgent({ prompt: currentPrompt });
203+
let streamedContent = '';
204+
205+
const data = await invokeAgent({
206+
prompt: currentPrompt,
207+
onChunk: (chunk: string) => {
208+
// Accumulate the streamed content
209+
streamedContent += chunk;
210+
211+
// Update the last message with the streamed content
212+
setMessages(prev => {
213+
const updated = [...prev];
214+
updated[streamingMessageIndex] = {
215+
type: 'agent',
216+
content: streamedContent,
217+
timestamp: new Date()
218+
};
219+
return updated;
220+
});
221+
}
222+
});
196223

197-
const agentMessage: Message = {
198-
type: 'agent',
199-
content: cleanResponse(data.response || ''),
200-
timestamp: new Date()
201-
};
224+
// Update with the final cleaned response
225+
const finalContent = cleanResponse(data.response || streamedContent);
226+
setMessages(prev => {
227+
const updated = [...prev];
228+
updated[streamingMessageIndex] = {
229+
type: 'agent',
230+
content: finalContent,
231+
timestamp: new Date()
232+
};
233+
return updated;
234+
});
202235

203-
setMessages(prev => [...prev, agentMessage]);
204236
// Show support prompts after agent responds
205237
setShowSupportPrompts(true);
206238
} catch (err: any) {
207239
setError(err.message);
240+
// Remove the placeholder message on error
241+
setMessages(prev => prev.slice(0, -1));
208242
} finally {
209243
setLoading(false);
210244
}
@@ -574,4 +608,4 @@ function App() {
574608
);
575609
}
576610

577-
export default App;
611+
export default App;

frontend/src/agentcore.ts

Lines changed: 115 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const localAgentUrl = (import.meta as any).env.VITE_AGENT_RUNTIME_URL || '/api';
88

99
export interface InvokeAgentRequest {
1010
prompt: string;
11+
onChunk?: (chunk: string) => void;
1112
}
1213

1314
export interface InvokeAgentResponse {
@@ -20,7 +21,7 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
2021
if (isLocalDev) {
2122
console.log('Invoking local AgentCore:', { url: localAgentUrl });
2223
console.log('Request payload:', { prompt: request.prompt });
23-
24+
2425
const response = await fetch(`${localAgentUrl}/invocations`, {
2526
method: 'POST',
2627
headers: {
@@ -30,15 +31,65 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
3031
prompt: request.prompt
3132
}),
3233
});
33-
34+
3435
console.log('Local AgentCore response status:', response.status);
35-
36+
3637
if (!response.ok) {
3738
const errorText = await response.text();
3839
console.error('Local AgentCore error response:', errorText);
3940
throw new Error(`Local AgentCore invocation failed: ${response.status} ${response.statusText} - ${errorText}`);
4041
}
4142

43+
// Check if streaming callback is provided
44+
if (request.onChunk && response.body) {
45+
const reader = response.body.getReader();
46+
const decoder = new TextDecoder();
47+
let fullResponse = '';
48+
let buffer = '';
49+
50+
try {
51+
while (true) {
52+
const { done, value } = await reader.read();
53+
54+
if (done) {
55+
break;
56+
}
57+
58+
const chunk = decoder.decode(value, { stream: true });
59+
buffer += chunk;
60+
61+
// Process complete SSE messages in the buffer
62+
const lines = buffer.split('\n');
63+
// Keep the last incomplete line in the buffer
64+
buffer = lines.pop() || '';
65+
66+
for (const line of lines) {
67+
if (line.startsWith('data: ')) {
68+
const data = line.slice(6); // Remove 'data: ' prefix
69+
70+
// Try to parse as JSON string
71+
try {
72+
const parsed = JSON.parse(data);
73+
fullResponse += parsed;
74+
// Call the chunk callback with parsed content
75+
request.onChunk(parsed);
76+
} catch {
77+
// If not JSON, use the raw data
78+
fullResponse += data;
79+
request.onChunk(data);
80+
}
81+
}
82+
}
83+
}
84+
85+
console.log('Streaming completed. Full response:', fullResponse);
86+
return { response: fullResponse };
87+
} finally {
88+
reader.releaseLock();
89+
}
90+
}
91+
92+
// Non-streaming mode (backward compatibility)
4293
let data;
4394
try {
4495
data = await response.json();
@@ -49,7 +100,7 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
49100
console.log('Raw response text:', textResponse);
50101
throw new Error(`Invalid JSON response from local AgentCore: ${textResponse}`);
51102
}
52-
103+
53104
// Handle different response formats from AgentCore
54105
let responseText = '';
55106
if (typeof data === 'string') {
@@ -59,9 +110,9 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
59110
} else {
60111
responseText = 'No response from agent';
61112
}
62-
113+
63114
console.log('Final response text:', responseText);
64-
115+
65116
return {
66117
response: responseText
67118
};
@@ -82,13 +133,13 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
82133

83134
// URL encode the agent runtime ARN for the API call (as per AWS documentation)
84135
const encodedAgentRuntimeArn = encodeURIComponent(agentRuntimeArn);
85-
136+
86137
// Use the correct AgentCore endpoint format from AWS documentation
87138
const url = `https://bedrock-agentcore.${region}.amazonaws.com/runtimes/${encodedAgentRuntimeArn}/invocations?qualifier=DEFAULT`;
88-
139+
89140
console.log('Invoking AgentCore directly:', { url, agentRuntimeArn, region });
90141
console.log('Request payload:', { prompt: request.prompt });
91-
142+
92143
const response = await fetch(url, {
93144
method: 'POST',
94145
headers: {
@@ -101,7 +152,7 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
101152
prompt: request.prompt
102153
}),
103154
});
104-
155+
105156
console.log('AgentCore response status:', response.status);
106157
console.log('AgentCore response headers:', Object.fromEntries(response.headers.entries()));
107158

@@ -111,6 +162,56 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
111162
throw new Error(`AgentCore invocation failed: ${response.status} ${response.statusText} - ${errorText}`);
112163
}
113164

165+
// Check if streaming callback is provided
166+
if (request.onChunk && response.body) {
167+
const reader = response.body.getReader();
168+
const decoder = new TextDecoder();
169+
let fullResponse = '';
170+
let buffer = '';
171+
172+
try {
173+
while (true) {
174+
const { done, value } = await reader.read();
175+
176+
if (done) {
177+
break;
178+
}
179+
180+
const chunk = decoder.decode(value, { stream: true });
181+
buffer += chunk;
182+
183+
// Process complete SSE messages in the buffer
184+
const lines = buffer.split('\n');
185+
// Keep the last incomplete line in the buffer
186+
buffer = lines.pop() || '';
187+
188+
for (const line of lines) {
189+
if (line.startsWith('data: ')) {
190+
const data = line.slice(6); // Remove 'data: ' prefix
191+
192+
// Try to parse as JSON string
193+
try {
194+
const parsed = JSON.parse(data);
195+
fullResponse += parsed;
196+
// Call the chunk callback with parsed content
197+
request.onChunk(parsed);
198+
} catch {
199+
// If not JSON, use the raw data
200+
fullResponse += data;
201+
request.onChunk(data);
202+
}
203+
}
204+
}
205+
}
206+
207+
console.log('Streaming completed. Full response:', fullResponse);
208+
return { response: fullResponse };
209+
} finally {
210+
reader.releaseLock();
211+
}
212+
}
213+
214+
// Non-streaming mode (backward compatibility)
114215
let data;
115216
try {
116217
data = await response.json();
@@ -121,7 +222,7 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
121222
console.log('Raw response text:', textResponse);
122223
throw new Error(`Invalid JSON response from AgentCore: ${textResponse}`);
123224
}
124-
225+
125226
// Handle different response formats from AgentCore
126227
let responseText = '';
127228
if (typeof data === 'string') {
@@ -131,9 +232,9 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
131232
} else {
132233
responseText = 'No response from agent';
133234
}
134-
235+
135236
console.log('Final response text:', responseText);
136-
237+
137238
return {
138239
response: responseText
139240
};
@@ -142,4 +243,4 @@ export const invokeAgent = async (request: InvokeAgentRequest): Promise<InvokeAg
142243
console.error('AgentCore invocation error:', error);
143244
throw new Error(`Failed to invoke agent: ${error.message}`);
144245
}
145-
};
246+
};

0 commit comments

Comments
 (0)