Skip to content

Commit 1ab3b41

Browse files
committed
refractor app service
1 parent 5883858 commit 1ab3b41

File tree

5 files changed

+266
-105
lines changed

5 files changed

+266
-105
lines changed

src/backend/app_kernel.py

Lines changed: 73 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
import uuid
6+
import re
67
from typing import List, Dict, Optional, Any
78

89
# FastAPI imports
@@ -85,51 +86,7 @@
8586
async def input_task_endpoint(input_task: InputTask, request: Request):
8687
"""
8788
Receive the initial input task from the user.
88-
89-
---
90-
tags:
91-
- Input Task
92-
parameters:
93-
- name: user_principal_id
94-
in: header
95-
type: string
96-
required: true
97-
description: User ID extracted from the authentication header
98-
- name: body
99-
in: body
100-
required: true
101-
schema:
102-
type: object
103-
properties:
104-
session_id:
105-
type: string
106-
description: Optional session ID, generated if not provided
107-
description:
108-
type: string
109-
description: The task description
110-
user_id:
111-
type: string
112-
description: The user ID associated with the task
113-
responses:
114-
200:
115-
description: Task created successfully
116-
schema:
117-
type: object
118-
properties:
119-
status:
120-
type: string
121-
session_id:
122-
type: string
123-
plan_id:
124-
type: string
125-
description:
126-
type: string
127-
user_id:
128-
type: string
129-
400:
130-
description: Missing or invalid user information
13189
"""
132-
13390
if not rai_success(input_task.description):
13491
print("RAI failed")
13592

@@ -150,63 +107,91 @@ async def input_task_endpoint(input_task: InputTask, request: Request):
150107

151108
if not user_id:
152109
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
153-
154110
raise HTTPException(status_code=400, detail="no user")
111+
112+
# Generate session ID if not provided
155113
if not input_task.session_id:
156114
input_task.session_id = str(uuid.uuid4())
157-
158-
# Get the agents for this session
159-
agents = await get_agents(input_task.session_id, user_id)
160115

161-
# Send the task to the planner agent
162-
planner_agent = agents["PlannerAgent"]
116+
# Set user ID from authenticated user
117+
input_task.user_id = user_id
163118

164-
# Convert input task to JSON for the kernel function
165-
input_task_json = input_task.json()
166-
167-
# Use the planner to handle the task
168-
result = await planner_agent.handle_input_task(
169-
KernelArguments(input_task_json=input_task_json)
170-
)
171-
172-
# Extract plan ID from the result
173-
# This is a simplified approach - in a real system,
174-
# we would properly parse the result to get the plan ID
175-
memory_store = planner_agent._memory_store
176-
plan = await memory_store.get_plan_by_session(input_task.session_id)
177-
178-
if not plan or not plan.id:
119+
try:
120+
# Create just the planner agent instead of all agents
121+
kernel, memory_store = await initialize_runtime_and_context(input_task.session_id, user_id)
122+
planner_agent = await AgentFactory.create_agent(
123+
agent_type=AgentType.PLANNER,
124+
session_id=input_task.session_id,
125+
user_id=user_id
126+
)
127+
128+
# Use the planner to handle the task - pass input_task_json for compatibility
129+
input_task_json = input_task.json()
130+
result = await planner_agent.handle_input_task(
131+
KernelArguments(input_task_json=input_task_json)
132+
)
133+
134+
# Get plan from memory store
135+
plan = await memory_store.get_plan_by_session(input_task.session_id)
136+
137+
if not plan or not plan.id:
138+
# If plan not found by session, try to extract plan ID from result
139+
plan_id_match = re.search(r"Plan '([^']+)'", result)
140+
141+
if plan_id_match:
142+
plan_id = plan_id_match.group(1)
143+
plan = await memory_store.get_plan(plan_id)
144+
145+
# If still no plan found, handle the failure
146+
if not plan or not plan.id:
147+
track_event_if_configured(
148+
"PlanCreationFailed",
149+
{
150+
"session_id": input_task.session_id,
151+
"description": input_task.description,
152+
}
153+
)
154+
return {
155+
"status": "Error: Failed to create plan",
156+
"session_id": input_task.session_id,
157+
"plan_id": "",
158+
"description": input_task.description,
159+
}
160+
161+
# Log custom event for successful input task processing
179162
track_event_if_configured(
180-
"PlanCreationFailed",
163+
"InputTaskProcessed",
181164
{
165+
"status": f"Plan created with ID: {plan.id}",
182166
"session_id": input_task.session_id,
167+
"plan_id": plan.id,
183168
"description": input_task.description,
184-
}
169+
},
185170
)
171+
186172
return {
187-
"status": "Error: Failed to create plan",
173+
"status": f"Plan created with ID: {plan.id}",
188174
"session_id": input_task.session_id,
189-
"plan_id": "",
175+
"plan_id": plan.id,
190176
"description": input_task.description,
191177
}
192-
193-
# Log custom event for successful input task processing
194-
track_event_if_configured(
195-
"InputTaskProcessed",
196-
{
197-
"status": f"Plan created with ID: {plan.id}",
178+
179+
except Exception as e:
180+
logging.exception(f"Error handling input task: {e}")
181+
track_event_if_configured(
182+
"InputTaskError",
183+
{
184+
"session_id": input_task.session_id,
185+
"description": input_task.description,
186+
"error": str(e),
187+
}
188+
)
189+
return {
190+
"status": f"Error creating plan: {str(e)}",
198191
"session_id": input_task.session_id,
199-
"plan_id": plan.id,
192+
"plan_id": "",
200193
"description": input_task.description,
201-
},
202-
)
203-
204-
return {
205-
"status": f"Plan created with ID: {plan.id}",
206-
"session_id": input_task.session_id,
207-
"plan_id": plan.id,
208-
"description": input_task.description,
209-
}
194+
}
210195

211196

212197
@app.post("/human_feedback")
@@ -658,6 +643,9 @@ async def get_agent_messages(session_id: str, request: Request) -> List[AgentMes
658643
- Agent Messages
659644
parameters:
660645
- name: session_id
646+
in: path
647+
type: string
648+
required: true
661649
in: path
662650
type: string
663651
required: true

src/backend/kernel_agents/agent_base.py

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,132 @@ async def handle_action_request_wrapper(*args, **kwargs):
127127
# Use agent name as plugin for handler
128128
self._kernel.add_function(self._agent_name, kernel_func)
129129

130+
async def handle_action_request(self, action_request_json: str) -> str:
131+
"""Handle an action request from another agent or the system.
132+
133+
Args:
134+
action_request_json: The action request as a JSON string
135+
136+
Returns:
137+
A JSON string containing the action response
138+
"""
139+
# Parse the action request
140+
action_request_dict = json.loads(action_request_json)
141+
action_request = ActionRequest(**action_request_dict)
142+
143+
# Get the step from memory
144+
step: Step = await self._memory_store.get_step(
145+
action_request.step_id, action_request.session_id
146+
)
147+
148+
if not step:
149+
# Create error response if step not found
150+
response = ActionResponse(
151+
step_id=action_request.step_id,
152+
status=StepStatus.failed,
153+
message="Step not found in memory.",
154+
)
155+
return response.json()
156+
157+
# Add messages to chat history for context
158+
# This gives the agent visibility of the conversation history
159+
self._chat_history.extend([
160+
{"role": "assistant", "content": action_request.action},
161+
{"role": "user", "content": f"{step.human_feedback}. Now make the function call"}
162+
])
163+
164+
try:
165+
# Use the agent to process the action
166+
chat_history = self._chat_history.copy()
167+
168+
# Call the agent to handle the action
169+
agent_response = await self._agent.invoke(self._kernel, f"{action_request.action}\n\nPlease perform this action")
170+
result = str(agent_response)
171+
172+
# Store agent message in cosmos memory
173+
await self._memory_store.add_item(
174+
AgentMessage(
175+
session_id=action_request.session_id,
176+
user_id=self._user_id,
177+
plan_id=action_request.plan_id,
178+
content=f"{result}",
179+
source=self._agent_name,
180+
step_id=action_request.step_id,
181+
)
182+
)
183+
184+
# Track telemetry
185+
track_event_if_configured(
186+
"Base agent - Added into the cosmos",
187+
{
188+
"session_id": action_request.session_id,
189+
"user_id": self._user_id,
190+
"plan_id": action_request.plan_id,
191+
"content": f"{result}",
192+
"source": self._agent_name,
193+
"step_id": action_request.step_id,
194+
},
195+
)
196+
197+
except Exception as e:
198+
logging.exception(f"Error during agent execution: {e}")
199+
200+
# Track error in telemetry
201+
track_event_if_configured(
202+
"Base agent - Error during agent execution, captured into the cosmos",
203+
{
204+
"session_id": action_request.session_id,
205+
"user_id": self._user_id,
206+
"plan_id": action_request.plan_id,
207+
"content": f"{e}",
208+
"source": self._agent_name,
209+
"step_id": action_request.step_id,
210+
},
211+
)
212+
213+
# Return an error response
214+
response = ActionResponse(
215+
step_id=action_request.step_id,
216+
plan_id=action_request.plan_id,
217+
session_id=action_request.session_id,
218+
result=f"Error: {str(e)}",
219+
status=StepStatus.failed,
220+
)
221+
return response.json()
222+
223+
logging.info(f"Task completed: {result}")
224+
225+
# Update step status
226+
step.status = StepStatus.completed
227+
step.agent_reply = result
228+
await self._memory_store.update_step(step)
229+
230+
# Track step completion in telemetry
231+
track_event_if_configured(
232+
"Base agent - Updated step and updated into the cosmos",
233+
{
234+
"status": StepStatus.completed,
235+
"session_id": action_request.session_id,
236+
"agent_reply": f"{result}",
237+
"user_id": self._user_id,
238+
"plan_id": action_request.plan_id,
239+
"content": f"{result}",
240+
"source": self._agent_name,
241+
"step_id": action_request.step_id,
242+
},
243+
)
244+
245+
# Create and return action response
246+
response = ActionResponse(
247+
step_id=step.id,
248+
plan_id=step.plan_id,
249+
session_id=action_request.session_id,
250+
result=result,
251+
status=StepStatus.completed,
252+
)
253+
254+
return response.json()
255+
130256
async def invoke_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
131257
"""Invoke a specific tool by name with the provided arguments.
132258
@@ -275,6 +401,11 @@ def get_tools_from_config(cls, kernel: sk.Kernel, agent_type: str, config_path:
275401
kernel_functions = []
276402
plugin_name = f"{agent_type}_plugin"
277403

404+
# Early return if no tools defined - prevent empty iteration
405+
if not config.get("tools"):
406+
logging.info(f"No tools defined for agent type '{agent_type}'. Returning empty list.")
407+
return kernel_functions
408+
278409
for tool in config.get("tools", []):
279410
try:
280411
function_name = tool["name"]
@@ -301,8 +432,22 @@ def get_tools_from_config(cls, kernel: sk.Kernel, agent_type: str, config_path:
301432
# Register the function with the kernel
302433
kernel.add_function(plugin_name, kernel_func)
303434
kernel_functions.append(kernel_func)
304-
#logging.info(f"Successfully created dynamic tool '{function_name}' for {agent_type}")
435+
logging.debug(f"Successfully created dynamic tool '{function_name}' for {agent_type}")
305436
except Exception as e:
306437
logging.error(f"Failed to create tool '{tool.get('name', 'unknown')}': {str(e)}")
307438

308-
return kernel_functions
439+
# Log the total number of tools created
440+
if kernel_functions:
441+
logging.info(f"Created {len(kernel_functions)} tools for agent type '{agent_type}'")
442+
else:
443+
logging.info(f"No tools were successfully created for agent type '{agent_type}'")
444+
445+
return kernel_functions
446+
447+
def save_state(self) -> Mapping[str, Any]:
448+
"""Save the state of this agent."""
449+
return {"memory": self._memory_store.save_state()}
450+
451+
def load_state(self, state: Mapping[str, Any]) -> None:
452+
"""Load the state of this agent."""
453+
self._memory_store.load_state(state["memory"])

0 commit comments

Comments
 (0)