Skip to content

Commit 3b09490

Browse files
Merge pull request #376 from cnoe-io/fix-aws
fix: make AWS agent to run in executor to prevent blocking
2 parents c6d7705 + 2581e89 commit 3b09490

File tree

3 files changed

+74
-60
lines changed

3 files changed

+74
-60
lines changed

ai_platform_engineering/agents/aws/agent_aws/protocol_bindings/a2a_server/agent.py

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,43 +15,25 @@ class AWSAgent:
1515

1616
def __init__(self):
1717
"""Initialize the A2A AWS Agent."""
18-
self._agent = None
18+
logger.info("Initializing AWS Agent and MCP servers...")
19+
# Initialize agent eagerly to download MCP packages at startup
20+
self._agent = BaseAWSAgent()
21+
logger.info("AWS Agent initialized successfully")
1922

2023
async def _get_agent(self) -> BaseAWSAgent:
2124
"""Get or create the agent instance."""
22-
if self._agent is None:
23-
self._agent = BaseAWSAgent()
2425
return self._agent
2526

2627
async def stream(self, query: str, context_id: str = None) -> AsyncIterator[Dict[str, Any]]:
2728
"""Stream response from the agent."""
2829
agent = await self._get_agent()
29-
30+
3031
# Run the synchronous agent in an executor to avoid blocking
3132
loop = asyncio.get_event_loop()
3233
response = await loop.run_in_executor(None, agent.run_sync, query)
33-
34-
# For now, we'll implement basic streaming by chunking the response
35-
# The underlying agent doesn't support native streaming yet
36-
words = response.split()
37-
chunk_size = 10 # words per chunk
38-
39-
for i in range(0, len(words), chunk_size):
40-
chunk = " ".join(words[i:i+chunk_size])
41-
if i + chunk_size < len(words):
42-
chunk += " "
43-
44-
# Send intermediate chunks
45-
yield {
46-
'content': chunk,
47-
'is_task_complete': False,
48-
'context_id': context_id
49-
}
50-
51-
# Small delay to simulate streaming
52-
await asyncio.sleep(0.1)
53-
54-
# Send final completion event
34+
35+
# Send final completion event with full response
36+
# Don't send fake intermediate chunks - just send the complete response
5537
yield {
5638
'content': response,
5739
'is_task_complete': True,
@@ -60,7 +42,5 @@ async def stream(self, query: str, context_id: str = None) -> AsyncIterator[Dict
6042

6143
def run_sync(self, query: str) -> str:
6244
"""Run the agent synchronously."""
63-
if self._agent is None:
64-
self._agent = BaseAWSAgent()
6545
result = self._agent.chat(query)
6646
return result.get("answer", "No response generated")

ai_platform_engineering/agents/aws/agent_aws/protocol_bindings/a2a_server/agent_executor.py

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,70 +13,103 @@
1313
TaskStatus,
1414
TaskStatusUpdateEvent,
1515
)
16-
from a2a.utils import new_task, new_text_artifact
16+
from a2a.utils import new_agent_text_message, new_task, new_text_artifact
1717

1818

1919
logger = logging.getLogger(__name__)
2020

2121

2222
class AWSAgentExecutor(AgentExecutor):
2323
"""A2A Agent Executor for AWS Agent."""
24-
24+
2525
SUPPORTED_CONTENT_TYPES = ["text/plain"]
26-
26+
2727
def __init__(self):
2828
"""Initialize the AWS Agent Executor."""
2929
self.agent = AWSAgent()
3030
logger.info("AWS Agent Executor initialized")
31-
31+
3232
@override
3333
async def execute(
3434
self,
3535
context: RequestContext,
3636
event_queue: EventQueue,
3737
) -> None:
3838
"""Execute the agent with the given context.
39-
39+
4040
Args:
4141
context: Request context containing user input and task info
4242
event_queue: Event queue for publishing task updates
4343
"""
4444
query = context.get_user_input()
4545
task = context.current_task
46+
context_id = context.message.contextId if context.message else None
4647

4748
if not context.message:
4849
raise Exception('No message provided')
4950

5051
if not task:
5152
task = new_task(context.message)
5253
await event_queue.enqueue_event(task)
53-
54+
5455
try:
55-
# Use the run_sync method from the A2A wrapper
56-
content = self.agent.run_sync(query)
57-
58-
await event_queue.enqueue_event(
59-
TaskArtifactUpdateEvent(
60-
append=False,
61-
contextId=task.contextId,
62-
taskId=task.id,
63-
lastChunk=True,
64-
artifact=new_text_artifact(
65-
name='current_result',
66-
description='Result of request to agent.',
67-
text=content
56+
# Run agent and stream response
57+
async for event in self.agent.stream(query, context_id):
58+
if event['is_task_complete']:
59+
# Send artifact chunk that client can accumulate
60+
await event_queue.enqueue_event(
61+
TaskArtifactUpdateEvent(
62+
append=False,
63+
contextId=task.contextId,
64+
taskId=task.id,
65+
lastChunk=False,
66+
artifact=new_text_artifact(
67+
name='current_result',
68+
description='Result of request to agent.',
69+
text=event['content'],
70+
),
71+
)
72+
)
73+
await event_queue.enqueue_event(
74+
TaskStatusUpdateEvent(
75+
status=TaskStatus(state=TaskState.completed),
76+
final=True,
77+
contextId=task.contextId,
78+
taskId=task.id,
79+
)
80+
)
81+
elif event['require_user_input']:
82+
await event_queue.enqueue_event(
83+
TaskStatusUpdateEvent(
84+
status=TaskStatus(
85+
state=TaskState.input_required,
86+
message=new_agent_text_message(
87+
event['content'],
88+
task.contextId,
89+
task.id,
90+
),
91+
),
92+
final=True,
93+
contextId=task.contextId,
94+
taskId=task.id,
95+
)
96+
)
97+
else:
98+
await event_queue.enqueue_event(
99+
TaskStatusUpdateEvent(
100+
status=TaskStatus(
101+
state=TaskState.working,
102+
message=new_agent_text_message(
103+
event['content'],
104+
task.contextId,
105+
task.id,
106+
),
107+
),
108+
final=False,
109+
contextId=task.contextId,
110+
taskId=task.id,
111+
)
68112
)
69-
)
70-
)
71-
await event_queue.enqueue_event(
72-
TaskStatusUpdateEvent(
73-
status=TaskStatus(state=TaskState.completed),
74-
final=True,
75-
contextId=task.contextId,
76-
taskId=task.id,
77-
)
78-
)
79-
80113
except Exception as e:
81114
logger.error(f"Error executing agent: {e}")
82115
await event_queue.enqueue_event(
@@ -100,15 +133,15 @@ async def execute(
100133
taskId=task.id,
101134
)
102135
)
103-
136+
104137
@override
105138
async def cancel(
106139
self,
107140
context: RequestContext,
108141
event_queue: EventQueue,
109142
) -> None:
110143
"""Cancel the current task execution.
111-
144+
112145
Args:
113146
context: Request context
114147
event_queue: Event queue for publishing cancellation updates

docker-compose/docker-compose.rag-only.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ services:
4545
- ENABLE_WEBEX_AGENT=false
4646
- ENABLE_PETSTORE_AGENT=false
4747
- ENABLE_RAG=true
48+
- RAG_AGENT_PORT=8099
4849
depends_on:
4950
- agent-rag-rag-only-p2p
5051
command: platform-engineer

0 commit comments

Comments
 (0)