Skip to content

Commit 22c3b85

Browse files
Multiple-BackgroundCheck -Fix
1 parent 2dbe915 commit 22c3b85

File tree

1 file changed

+102
-104
lines changed

1 file changed

+102
-104
lines changed

src/backend/kernel_agents/group_chat_manager.py

Lines changed: 102 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from datetime import datetime
34
from typing import Dict, List, Optional
@@ -12,6 +13,9 @@
1213
# pylint: disable=E0611
1314
from semantic_kernel.functions.kernel_function import KernelFunction
1415

16+
# Module-level dictionary to track step execution locks
17+
step_execution_locks: Dict[str, asyncio.Lock] = {}
18+
1519

1620
class GroupChatManager(BaseAgent):
1721
"""GroupChatManager agent implementation using Semantic Kernel.
@@ -309,130 +313,124 @@ async def _update_step_status(
309313
step.human_approval_status = HumanFeedbackStatus.rejected
310314

311315
step.human_feedback = received_human_feedback
312-
step.status = StepStatus.completed
313316
await self._memory_store.update_step(step)
314-
track_event_if_configured(
315-
f"{AgentType.GROUP_CHAT_MANAGER.value} - Received human feedback, Updating step and updated into the cosmos",
316-
{
317-
"status": StepStatus.completed,
318-
"session_id": step.session_id,
319-
"user_id": self._user_id,
320-
"human_feedback": received_human_feedback,
321-
"source": step.agent,
322-
},
323-
)
324317

325318
async def _execute_step(self, session_id: str, step: Step):
326319
"""
327320
Executes the given step by sending an ActionRequest to the appropriate agent.
328321
"""
329-
# Update step status to 'action_requested'
330-
step.status = StepStatus.action_requested
331-
await self._memory_store.update_step(step)
332-
track_event_if_configured(
333-
f"{AgentType.GROUP_CHAT_MANAGER.value} - Update step to action_requested and updated into the cosmos",
334-
{
335-
"status": StepStatus.action_requested,
336-
"session_id": step.session_id,
337-
"user_id": self._user_id,
338-
"source": step.agent,
339-
},
340-
)
322+
lock = step_execution_locks.setdefault(step.id, asyncio.Lock())
341323

342-
# generate conversation history for the invoked agent
343-
plan = await self._memory_store.get_plan_by_session(session_id=session_id)
344-
steps: List[Step] = await self._memory_store.get_steps_by_plan(plan.id)
345-
346-
current_step_id = step.id
347-
# Initialize the formatted string
348-
formatted_string = ""
349-
formatted_string += "<conversation_history>Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute."
350-
formatted_string += f"The user's task was:\n{plan.summary}\n\n"
351-
formatted_string += (
352-
f" human_clarification_request:\n{plan.human_clarification_request}\n\n"
353-
)
354-
formatted_string += (
355-
f" human_clarification_response:\n{plan.human_clarification_response}\n\n"
356-
)
357-
formatted_string += (
358-
"The conversation between the previous agents so far is below:\n"
359-
)
324+
async with lock:
325+
# Always refresh the step from the DB
326+
latest_step = await self._memory_store.get_step(step.id, session_id)
327+
working_step = latest_step if latest_step else step
360328

361-
# Iterate over the steps until the current_step_id
362-
for i, step in enumerate(steps):
363-
if step.id == current_step_id:
364-
break
365-
formatted_string += f"Step {i}\n"
366-
formatted_string += f"{AgentType.GROUP_CHAT_MANAGER.value}: {step.action}\n"
367-
formatted_string += f"{step.agent.value}: {step.agent_reply}\n"
368-
formatted_string += "<conversation_history \\>"
329+
# ✅ Skip only if step already completed
330+
if working_step.status == StepStatus.completed:
331+
logging.info(f"[SKIP] Step {step.id} already completed.")
332+
return
369333

370-
logging.info(f"Formatted string: {formatted_string}")
334+
# ✅ Prevent re-entry for in-progress steps
335+
if working_step.status == StepStatus.action_requested:
336+
logging.info(f"[SKIP] Step {step.id} already in progress.")
337+
return
371338

372-
action_with_history = f"{formatted_string}. Here is the step to action: {step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned."
339+
# ✅ Mark step as 'action_requested' to block duplicates
340+
working_step.status = StepStatus.action_requested
341+
await self._memory_store.update_step(working_step)
373342

374-
# Send action request to the appropriate agent
375-
action_request = ActionRequest(
376-
step_id=step.id,
377-
plan_id=step.plan_id,
378-
session_id=session_id,
379-
action=action_with_history,
380-
agent=step.agent,
381-
)
382-
logging.info(f"Sending ActionRequest to {step.agent.value}")
343+
track_event_if_configured(
344+
f"{AgentType.GROUP_CHAT_MANAGER.value} - Step locked for execution",
345+
{
346+
"step_id": working_step.id,
347+
"status": StepStatus.action_requested,
348+
"session_id": session_id,
349+
"user_id": self._user_id,
350+
},
351+
)
383352

384-
if step.agent != "":
385-
agent_name = step.agent.value
386-
formatted_agent = agent_name.replace("_", " ")
387-
else:
388-
raise ValueError(f"Check {step.agent} is missing")
353+
# Generate conversation history for the invoked agent
354+
plan = await self._memory_store.get_plan_by_session(session_id=session_id)
355+
steps: List[Step] = await self._memory_store.get_steps_by_plan(plan.id)
356+
357+
current_step_id = working_step.id
358+
# Initialize the formatted string
359+
formatted_string = ""
360+
formatted_string += "<conversation_history>Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute."
361+
formatted_string += f"The user's task was:\n{plan.summary}\n\n"
362+
formatted_string += (
363+
f" human_clarification_request:\n{plan.human_clarification_request}\n\n"
364+
)
365+
formatted_string += (
366+
f" human_clarification_response:\n{plan.human_clarification_response}\n\n"
367+
)
368+
formatted_string += (
369+
"The conversation between the previous agents so far is below:\n"
370+
)
389371

390-
await self._memory_store.add_item(
391-
AgentMessage(
372+
# Iterate over the steps until the current_step_id
373+
for i, step_item in enumerate(steps):
374+
if step_item.id == current_step_id:
375+
break
376+
formatted_string += f"Step {i}\n"
377+
formatted_string += f"{AgentType.GROUP_CHAT_MANAGER.value}: {step_item.action}\n"
378+
formatted_string += f"{step_item.agent.value}: {step_item.agent_reply}\n"
379+
formatted_string += "<conversation_history \\>"
380+
381+
action_with_history = f"{formatted_string}. Here is the step to action: {working_step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned."
382+
383+
# Create ActionRequest
384+
action_request = ActionRequest(
385+
step_id=working_step.id,
386+
plan_id=working_step.plan_id,
392387
session_id=session_id,
393-
user_id=self._user_id,
394-
plan_id=step.plan_id,
395-
content=f"Requesting {formatted_agent} to perform action: {step.action}",
396-
source=AgentType.GROUP_CHAT_MANAGER.value,
397-
step_id=step.id,
388+
action=action_with_history,
389+
agent=working_step.agent,
390+
)
391+
logging.info(f"Sending ActionRequest to {working_step.agent.value}")
392+
393+
if working_step.agent != "":
394+
agent_name = working_step.agent.value
395+
formatted_agent = agent_name.replace("_", " ")
396+
else:
397+
raise ValueError(f"Check {working_step.agent} is missing")
398+
399+
await self._memory_store.add_item(
400+
AgentMessage(
401+
session_id=session_id,
402+
user_id=self._user_id,
403+
plan_id=working_step.plan_id,
404+
content=f"Requesting {formatted_agent} to perform action: {working_step.action}",
405+
source=AgentType.GROUP_CHAT_MANAGER.value,
406+
step_id=working_step.id,
407+
)
398408
)
399-
)
400409

401-
track_event_if_configured(
402-
f"{AgentType.GROUP_CHAT_MANAGER.value} - Requesting {formatted_agent} to perform the action and added into the cosmos",
403-
{
404-
"session_id": session_id,
405-
"user_id": self._user_id,
406-
"plan_id": step.plan_id,
407-
"content": f"Requesting {formatted_agent} to perform action: {step.action}",
408-
"source": AgentType.GROUP_CHAT_MANAGER.value,
409-
"step_id": step.id,
410-
},
411-
)
410+
if working_step.agent == AgentType.HUMAN.value:
411+
# we mark the step as complete since we have received the human feedback
412+
working_step.status = StepStatus.completed
413+
await self._memory_store.update_step(working_step)
414+
logging.info(
415+
"Marking the step as complete - Since we have received the human feedback"
416+
)
417+
else:
418+
# Use the agent from the step to determine which agent to send to
419+
agent = self._agent_instances[working_step.agent.value]
420+
await agent.handle_action_request(action_request)
421+
422+
# ✅ Mark completed after successful execution
423+
working_step.status = StepStatus.completed
424+
await self._memory_store.update_step(working_step)
425+
logging.info(f"Sent ActionRequest to {working_step.agent.value}")
412426

413-
if step.agent == AgentType.HUMAN.value:
414-
# we mark the step as complete since we have received the human feedback
415-
# Update step status to 'completed'
416-
step.status = StepStatus.completed
417-
await self._memory_store.update_step(step)
418-
logging.info(
419-
"Marking the step as complete - Since we have received the human feedback"
420-
)
421427
track_event_if_configured(
422-
"Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos",
428+
f"{AgentType.GROUP_CHAT_MANAGER.value} - Step Executed",
423429
{
430+
"step_id": working_step.id,
431+
"status": StepStatus.completed,
432+
"agent": working_step.agent.value,
424433
"session_id": session_id,
425434
"user_id": self._user_id,
426-
"plan_id": step.plan_id,
427-
"content": "Marking the step as complete - Since we have received the human feedback",
428-
"source": step.agent,
429-
"step_id": step.id,
430435
},
431436
)
432-
else:
433-
# Use the agent from the step to determine which agent to send to
434-
agent = self._agent_instances[step.agent.value]
435-
await agent.handle_action_request(
436-
action_request
437-
) # this function is in base_agent.py
438-
logging.info(f"Sent ActionRequest to {step.agent.value}")

0 commit comments

Comments
 (0)