From 22c3b857958434349acf329f9703c1a5a1a439ea Mon Sep 17 00:00:00 2001 From: UtkarshMishra-Microsoft Date: Thu, 10 Jul 2025 19:42:51 +0530 Subject: [PATCH 1/3] Multiple-BackgroundCheck -Fix --- .../kernel_agents/group_chat_manager.py | 206 +++++++++--------- 1 file changed, 102 insertions(+), 104 deletions(-) diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index 19215c34c..be7b2f6df 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime from typing import Dict, List, Optional @@ -12,6 +13,9 @@ # pylint: disable=E0611 from semantic_kernel.functions.kernel_function import KernelFunction +# Module-level dictionary to track step execution locks +step_execution_locks: Dict[str, asyncio.Lock] = {} + class GroupChatManager(BaseAgent): """GroupChatManager agent implementation using Semantic Kernel. @@ -309,130 +313,124 @@ async def _update_step_status( step.human_approval_status = HumanFeedbackStatus.rejected step.human_feedback = received_human_feedback - step.status = StepStatus.completed await self._memory_store.update_step(step) - track_event_if_configured( - f"{AgentType.GROUP_CHAT_MANAGER.value} - Received human feedback, Updating step and updated into the cosmos", - { - "status": StepStatus.completed, - "session_id": step.session_id, - "user_id": self._user_id, - "human_feedback": received_human_feedback, - "source": step.agent, - }, - ) async def _execute_step(self, session_id: str, step: Step): """ Executes the given step by sending an ActionRequest to the appropriate agent. """ - # Update step status to 'action_requested' - step.status = StepStatus.action_requested - await self._memory_store.update_step(step) - track_event_if_configured( - f"{AgentType.GROUP_CHAT_MANAGER.value} - Update step to action_requested and updated into the cosmos", - { - "status": StepStatus.action_requested, - "session_id": step.session_id, - "user_id": self._user_id, - "source": step.agent, - }, - ) + lock = step_execution_locks.setdefault(step.id, asyncio.Lock()) - # generate conversation history for the invoked agent - plan = await self._memory_store.get_plan_by_session(session_id=session_id) - steps: List[Step] = await self._memory_store.get_steps_by_plan(plan.id) - - current_step_id = step.id - # Initialize the formatted string - formatted_string = "" - formatted_string += "Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute." - formatted_string += f"The user's task was:\n{plan.summary}\n\n" - formatted_string += ( - f" human_clarification_request:\n{plan.human_clarification_request}\n\n" - ) - formatted_string += ( - f" human_clarification_response:\n{plan.human_clarification_response}\n\n" - ) - formatted_string += ( - "The conversation between the previous agents so far is below:\n" - ) + async with lock: + # Always refresh the step from the DB + latest_step = await self._memory_store.get_step(step.id, session_id) + working_step = latest_step if latest_step else step - # Iterate over the steps until the current_step_id - for i, step in enumerate(steps): - if step.id == current_step_id: - break - formatted_string += f"Step {i}\n" - formatted_string += f"{AgentType.GROUP_CHAT_MANAGER.value}: {step.action}\n" - formatted_string += f"{step.agent.value}: {step.agent_reply}\n" - formatted_string += "" + # ✅ Skip only if step already completed + if working_step.status == StepStatus.completed: + logging.info(f"[SKIP] Step {step.id} already completed.") + return - logging.info(f"Formatted string: {formatted_string}") + # ✅ Prevent re-entry for in-progress steps + if working_step.status == StepStatus.action_requested: + logging.info(f"[SKIP] Step {step.id} already in progress.") + return - action_with_history = f"{formatted_string}. Here is the step to action: {step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned." + # ✅ Mark step as 'action_requested' to block duplicates + working_step.status = StepStatus.action_requested + await self._memory_store.update_step(working_step) - # Send action request to the appropriate agent - action_request = ActionRequest( - step_id=step.id, - plan_id=step.plan_id, - session_id=session_id, - action=action_with_history, - agent=step.agent, - ) - logging.info(f"Sending ActionRequest to {step.agent.value}") + track_event_if_configured( + f"{AgentType.GROUP_CHAT_MANAGER.value} - Step locked for execution", + { + "step_id": working_step.id, + "status": StepStatus.action_requested, + "session_id": session_id, + "user_id": self._user_id, + }, + ) - if step.agent != "": - agent_name = step.agent.value - formatted_agent = agent_name.replace("_", " ") - else: - raise ValueError(f"Check {step.agent} is missing") + # Generate conversation history for the invoked agent + plan = await self._memory_store.get_plan_by_session(session_id=session_id) + steps: List[Step] = await self._memory_store.get_steps_by_plan(plan.id) + + current_step_id = working_step.id + # Initialize the formatted string + formatted_string = "" + formatted_string += "Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute." + formatted_string += f"The user's task was:\n{plan.summary}\n\n" + formatted_string += ( + f" human_clarification_request:\n{plan.human_clarification_request}\n\n" + ) + formatted_string += ( + f" human_clarification_response:\n{plan.human_clarification_response}\n\n" + ) + formatted_string += ( + "The conversation between the previous agents so far is below:\n" + ) - await self._memory_store.add_item( - AgentMessage( + # Iterate over the steps until the current_step_id + for i, step_item in enumerate(steps): + if step_item.id == current_step_id: + break + formatted_string += f"Step {i}\n" + formatted_string += f"{AgentType.GROUP_CHAT_MANAGER.value}: {step_item.action}\n" + formatted_string += f"{step_item.agent.value}: {step_item.agent_reply}\n" + formatted_string += "" + + action_with_history = f"{formatted_string}. Here is the step to action: {working_step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned." + + # Create ActionRequest + action_request = ActionRequest( + step_id=working_step.id, + plan_id=working_step.plan_id, session_id=session_id, - user_id=self._user_id, - plan_id=step.plan_id, - content=f"Requesting {formatted_agent} to perform action: {step.action}", - source=AgentType.GROUP_CHAT_MANAGER.value, - step_id=step.id, + action=action_with_history, + agent=working_step.agent, + ) + logging.info(f"Sending ActionRequest to {working_step.agent.value}") + + if working_step.agent != "": + agent_name = working_step.agent.value + formatted_agent = agent_name.replace("_", " ") + else: + raise ValueError(f"Check {working_step.agent} is missing") + + await self._memory_store.add_item( + AgentMessage( + session_id=session_id, + user_id=self._user_id, + plan_id=working_step.plan_id, + content=f"Requesting {formatted_agent} to perform action: {working_step.action}", + source=AgentType.GROUP_CHAT_MANAGER.value, + step_id=working_step.id, + ) ) - ) - track_event_if_configured( - f"{AgentType.GROUP_CHAT_MANAGER.value} - Requesting {formatted_agent} to perform the action and added into the cosmos", - { - "session_id": session_id, - "user_id": self._user_id, - "plan_id": step.plan_id, - "content": f"Requesting {formatted_agent} to perform action: {step.action}", - "source": AgentType.GROUP_CHAT_MANAGER.value, - "step_id": step.id, - }, - ) + if working_step.agent == AgentType.HUMAN.value: + # we mark the step as complete since we have received the human feedback + working_step.status = StepStatus.completed + await self._memory_store.update_step(working_step) + logging.info( + "Marking the step as complete - Since we have received the human feedback" + ) + else: + # Use the agent from the step to determine which agent to send to + agent = self._agent_instances[working_step.agent.value] + await agent.handle_action_request(action_request) + + # ✅ Mark completed after successful execution + working_step.status = StepStatus.completed + await self._memory_store.update_step(working_step) + logging.info(f"Sent ActionRequest to {working_step.agent.value}") - if step.agent == AgentType.HUMAN.value: - # we mark the step as complete since we have received the human feedback - # Update step status to 'completed' - step.status = StepStatus.completed - await self._memory_store.update_step(step) - logging.info( - "Marking the step as complete - Since we have received the human feedback" - ) track_event_if_configured( - "Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos", + f"{AgentType.GROUP_CHAT_MANAGER.value} - Step Executed", { + "step_id": working_step.id, + "status": StepStatus.completed, + "agent": working_step.agent.value, "session_id": session_id, "user_id": self._user_id, - "plan_id": step.plan_id, - "content": "Marking the step as complete - Since we have received the human feedback", - "source": step.agent, - "step_id": step.id, }, ) - else: - # Use the agent from the step to determine which agent to send to - agent = self._agent_instances[step.agent.value] - await agent.handle_action_request( - action_request - ) # this function is in base_agent.py - logging.info(f"Sent ActionRequest to {step.agent.value}") From 67dd020847e6398982b3f124712dc26869356d78 Mon Sep 17 00:00:00 2001 From: UtkarshMishra-Microsoft Date: Thu, 10 Jul 2025 19:48:39 +0530 Subject: [PATCH 2/3] Pylint_issue --- src/backend/kernel_agents/group_chat_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index be7b2f6df..d8c470913 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -418,7 +418,7 @@ async def _execute_step(self, session_id: str, step: Step): # Use the agent from the step to determine which agent to send to agent = self._agent_instances[working_step.agent.value] await agent.handle_action_request(action_request) - + # ✅ Mark completed after successful execution working_step.status = StepStatus.completed await self._memory_store.update_step(working_step) From cce476b0fc2ecce1e54f141a9361fc77f4c9e400 Mon Sep 17 00:00:00 2001 From: UtkarshMishra-Microsoft Date: Thu, 10 Jul 2025 19:54:17 +0530 Subject: [PATCH 3/3] Updated file --- src/backend/kernel_agents/group_chat_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index d8c470913..b08dc0f13 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -13,7 +13,7 @@ # pylint: disable=E0611 from semantic_kernel.functions.kernel_function import KernelFunction -# Module-level dictionary to track step execution locks +# Module-level dictionary to the track step execution locks step_execution_locks: Dict[str, asyncio.Lock] = {}