Skip to content

Commit 2a36ee5

Browse files
reasoning streaming llm
1 parent 0a644f6 commit 2a36ee5

File tree

4 files changed

+179
-3
lines changed

4 files changed

+179
-3
lines changed

src/backend/app_kernel.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,180 @@ async def approve_step_endpoint(
799799
return {"status": "All steps approved"}
800800

801801

802+
@app.post("/api/human_clarification_on_plan_stream")
803+
async def human_clarification_stream_endpoint(
804+
human_clarification: HumanClarification, request: Request
805+
):
806+
"""
807+
Receive human clarification on a plan with streaming agent responses.
808+
"""
809+
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
810+
user_id = authenticated_user["user_principal_id"]
811+
if not user_id:
812+
raise HTTPException(status_code=400, detail="no user")
813+
814+
async def generate_clarification_stream():
815+
try:
816+
# Initialize agents and memory store
817+
kernel, memory_store = await initialize_runtime_and_context(
818+
human_clarification.session_id, user_id
819+
)
820+
client = None
821+
try:
822+
client = config.get_ai_project_client()
823+
except Exception as client_exc:
824+
logging.error(f"Error creating AIProjectClient: {client_exc}")
825+
826+
# Create human agent
827+
human_agent = await AgentFactory.create_agent(
828+
agent_type=AgentType.HUMAN,
829+
session_id=human_clarification.session_id,
830+
user_id=user_id,
831+
memory_store=memory_store,
832+
client=client,
833+
)
834+
835+
if human_agent is None:
836+
yield "data: Failed to create human agent\n\n"
837+
return
838+
839+
# Stream initial processing message
840+
yield "data: **Human Agent:** Processing your clarification...\n\n"
841+
await asyncio.sleep(0.5)
842+
843+
# Process the clarification
844+
result = await human_agent.handle_human_clarification(
845+
human_clarification=human_clarification
846+
)
847+
848+
# Stream planner response
849+
yield "data: **Planner Agent:** Thanks, the plan has been updated based on your clarification.\n\n"
850+
await asyncio.sleep(0.5)
851+
852+
# Stream completion message
853+
yield "data: **System:** Plan clarification processed. You can now approve individual steps.\n\n"
854+
yield "data: [DONE]\n\n"
855+
856+
if client:
857+
try:
858+
client.close()
859+
except Exception as e:
860+
logging.error(f"Error closing AIProjectClient: {e}")
861+
862+
except Exception as e:
863+
logging.error(f"Error in clarification streaming: {e}")
864+
yield f"data: Error processing clarification: {str(e)}\n\n"
865+
yield "data: [DONE]\n\n"
866+
867+
return StreamingResponse(
868+
generate_clarification_stream(),
869+
media_type="text/event-stream",
870+
headers={
871+
"Cache-Control": "no-cache",
872+
"Connection": "keep-alive",
873+
"Access-Control-Allow-Origin": "*",
874+
},
875+
)
876+
877+
878+
@app.post("/api/approve_step_or_steps_stream")
879+
async def approve_step_stream_endpoint(
880+
human_feedback: HumanFeedback, request: Request
881+
):
882+
"""
883+
Approve a step or multiple steps with streaming agent responses.
884+
"""
885+
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
886+
user_id = authenticated_user["user_principal_id"]
887+
if not user_id:
888+
raise HTTPException(status_code=400, detail="no user")
889+
890+
async def generate_approval_stream():
891+
try:
892+
# Initialize agents and memory store
893+
kernel, memory_store = await initialize_runtime_and_context(
894+
human_feedback.session_id, user_id
895+
)
896+
client = None
897+
try:
898+
client = config.get_ai_project_client()
899+
except Exception as client_exc:
900+
logging.error(f"Error creating AIProjectClient: {client_exc}")
901+
902+
# Create all agents
903+
agents = await AgentFactory.create_all_agents(
904+
session_id=human_feedback.session_id,
905+
user_id=user_id,
906+
memory_store=memory_store,
907+
client=client,
908+
)
909+
910+
group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value]
911+
912+
# Stream initial processing message
913+
action = "Approving" if human_feedback.approved else "Rejecting"
914+
yield f"data: **Group Chat Manager:** {action} step and coordinating with relevant agents...\n\n"
915+
await asyncio.sleep(0.5)
916+
917+
# Process the approval
918+
await group_chat_manager.handle_human_feedback(human_feedback)
919+
920+
# Get the updated step to see what agent was assigned
921+
updated_steps = await memory_store.get_steps_by_plan(human_feedback.plan_id)
922+
target_step = next((s for s in updated_steps if s.id == human_feedback.step_id), None)
923+
924+
# Stream agent responses based on step
925+
if human_feedback.approved:
926+
yield "data: **Human Agent:** Step approved successfully.\n\n"
927+
await asyncio.sleep(0.5)
928+
yield "data: **Group Chat Manager:** Step has been marked as approved and will be executed.\n\n"
929+
await asyncio.sleep(0.5)
930+
931+
if target_step and target_step.agent:
932+
# Show which agent is handling the step
933+
agent_name = target_step.agent.replace('_', ' ').title()
934+
yield f"data: **Group Chat Manager:** Assigning step to {agent_name} for execution...\n\n"
935+
await asyncio.sleep(0.5)
936+
937+
# Simulate detailed agent planning/execution response
938+
yield f"data: **{agent_name}:** Analyzing step requirements...\n\n"
939+
await asyncio.sleep(0.5)
940+
yield f"data: **{agent_name}:** Creating detailed execution plan for: {target_step.action}\n\n"
941+
await asyncio.sleep(0.5)
942+
yield f"data: **{agent_name}:** ✅ Step execution plan prepared. Ready to proceed when all approvals are complete.\n\n"
943+
await asyncio.sleep(0.5)
944+
yield f"data: **Group Chat Manager:** {agent_name} has confirmed step readiness. Step is now marked as approved.\n\n"
945+
else:
946+
yield "data: **Human Agent:** Step has been rejected.\n\n"
947+
await asyncio.sleep(0.5)
948+
yield "data: **Group Chat Manager:** Step marked as rejected. Plan updated accordingly.\n\n"
949+
950+
await asyncio.sleep(0.5)
951+
yield "data: **System:** Step approval processed successfully.\n\n"
952+
yield "data: [DONE]\n\n"
953+
954+
if client:
955+
try:
956+
client.close()
957+
except Exception as e:
958+
logging.error(f"Error closing AIProjectClient: {e}")
959+
960+
except Exception as e:
961+
logging.error(f"Error in approval streaming: {e}")
962+
yield f"data: Error processing approval: {str(e)}\n\n"
963+
yield "data: [DONE]\n\n"
964+
965+
return StreamingResponse(
966+
generate_approval_stream(),
967+
media_type="text/event-stream",
968+
headers={
969+
"Cache-Control": "no-cache",
970+
"Connection": "keep-alive",
971+
"Access-Control-Allow-Origin": "*",
972+
},
973+
)
974+
975+
802976
@app.get("/api/plans")
803977
async def get_plans(
804978
request: Request,

src/backend/kernel_agents/group_chat_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,12 @@ async def _update_step_status(
309309
step.human_approval_status = HumanFeedbackStatus.rejected
310310

311311
step.human_feedback = received_human_feedback
312-
step.status = StepStatus.completed
312+
# Don't set status to completed here - keep the approved/rejected status
313313
await self._memory_store.update_step(step)
314314
track_event_if_configured(
315315
f"{AgentType.GROUP_CHAT_MANAGER.value} - Received human feedback, Updating step and updated into the cosmos",
316316
{
317-
"status": StepStatus.completed,
317+
"status": step.status, # Use the actual status (approved/rejected)
318318
"session_id": step.session_id,
319319
"user_id": self._user_id,
320320
"human_feedback": received_human_feedback,

src/backend/kernel_agents/planner_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ async def _create_structured_plan(
370370
logging.exception(f"Error during parsing attempts: {parsing_exception}")
371371
raise ValueError("Failed to parse JSON response")
372372

373-
# At this point, we have a valid parsed_result
373+
374374

375375
# Extract plan details
376376
initial_goal = parsed_result.initial_goal

src/backend/utils_kernel.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ async def generate_plan_with_reasoning_stream(task_description: str, plan_id: st
496496
plan_summary = plan_data.get("plan", {}).get("summary", "Plan created successfully")
497497
plan.summary = plan_summary
498498
plan.overall_status = PlanStatus.in_progress
499+
499500
await memory_store.update_plan(plan)
500501

501502
# Single processing message with all the info
@@ -564,6 +565,7 @@ async def generate_plan_with_reasoning_stream(task_description: str, plan_id: st
564565
if plan:
565566
plan.summary = "Plan created but could not parse detailed steps from response"
566567
plan.overall_status = PlanStatus.in_progress
568+
567569
await memory_store.update_plan(plan)
568570

569571
# Create a basic step

0 commit comments

Comments
 (0)