Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/backend/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Step,
StepStatus,
)
from azure.monitor.events.extension import track_event
from event_utils import track_event_if_configured


class BaseAgent(RoutedAgent):
Expand Down Expand Up @@ -105,7 +105,7 @@ async def handle_action_request(
)
)

track_event(
track_event_if_configured(
"Base agent - Added into the cosmos",
{
"session_id": message.session_id,
Expand All @@ -119,7 +119,7 @@ async def handle_action_request(

except Exception as e:
logging.exception(f"Error during LLM call: {e}")
track_event(
track_event_if_configured(
"Base agent - Error during llm call, captured into the cosmos",
{
"session_id": message.session_id,
Expand All @@ -138,7 +138,7 @@ async def handle_action_request(
step.agent_reply = result
await self._model_context.update_step(step)

track_event(
track_event_if_configured(
"Base agent - Updated step and updated into the cosmos",
{
"status": StepStatus.completed,
Expand Down
16 changes: 8 additions & 8 deletions src/backend/agents/group_chat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
StepStatus,
)

from azure.monitor.events.extension import track_event
from event_utils import track_event_if_configured


@default_subscription
Expand Down Expand Up @@ -62,7 +62,7 @@ async def handle_input_task(
)
)

track_event(
track_event_if_configured(
"Group Chat Manager - Received and added input task into the cosmos",
{
"session_id": message.session_id,
Expand Down Expand Up @@ -164,7 +164,7 @@ class Step(BaseDataModel):
step.status = StepStatus.rejected
step.human_approval_status = HumanFeedbackStatus.rejected
self._memory.update_step(step)
track_event(
track_event_if_configured(
"Group Chat Manager - Steps has been rejected and updated into the cosmos",
{
"status": StepStatus.rejected,
Expand All @@ -188,7 +188,7 @@ class Step(BaseDataModel):
step.status = StepStatus.rejected
step.human_approval_status = HumanFeedbackStatus.rejected
self._memory.update_step(step)
track_event(
track_event_if_configured(
"Group Chat Manager - Step has been rejected and updated into the cosmos",
{
"status": StepStatus.rejected,
Expand All @@ -213,7 +213,7 @@ async def _update_step_status(
step.human_feedback = received_human_feedback
step.status = StepStatus.completed
await self._memory.update_step(step)
track_event(
track_event_if_configured(
"Group Chat Manager - Received human feedback, Updating step and updated into the cosmos",
{
"status": StepStatus.completed,
Expand Down Expand Up @@ -241,7 +241,7 @@ async def _execute_step(self, session_id: str, step: Step):
# Update step status to 'action_requested'
step.status = StepStatus.action_requested
await self._memory.update_step(step)
track_event(
track_event_if_configured(
"Group Chat Manager - Update step to action_requested and updated into the cosmos",
{
"status": StepStatus.action_requested,
Expand Down Expand Up @@ -304,7 +304,7 @@ async def _execute_step(self, session_id: str, step: Step):
)
)

track_event(
track_event_if_configured(
f"Group Chat Manager - Requesting {step.agent.value.title()} to perform the action and added into the cosmos",
{
"session_id": session_id,
Expand Down Expand Up @@ -338,7 +338,7 @@ async def _execute_step(self, session_id: str, step: Step):
logging.info(
"Marking the step as complete - Since we have received the human feedback"
)
track_event(
track_event_if_configured(
"Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos",
{
"session_id": session_id,
Expand Down
6 changes: 3 additions & 3 deletions src/backend/agents/human.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
AgentMessage,
Step,
)
from azure.monitor.events.extension import track_event
from event_utils import track_event_if_configured


@default_subscription
Expand Down Expand Up @@ -57,7 +57,7 @@ async def handle_step_feedback(
)
)
logging.info(f"HumanAgent received feedback for step: {step}")
track_event(
track_event_if_configured(
f"Human Agent - Received feedback for step: {step} and added into the cosmos",
{
"session_id": message.session_id,
Expand All @@ -81,7 +81,7 @@ async def handle_step_feedback(
)
logging.info(f"HumanAgent sent approval request for step: {step}")

track_event(
track_event_if_configured(
f"Human Agent - Approval request sent for step {step} and added into the cosmos",
{
"session_id": message.session_id,
Expand Down
18 changes: 9 additions & 9 deletions src/backend/agents/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
HumanFeedbackStatus,
)

from azure.monitor.events.extension import track_event
from event_utils import track_event_if_configured


@default_subscription
Expand Down Expand Up @@ -74,7 +74,7 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl
)
logging.info(f"Plan generated: {plan.summary}")

track_event(
track_event_if_configured(
f"Planner - Generated a plan with {len(steps)} steps and added plan into the cosmos",
{
"session_id": message.session_id,
Expand All @@ -101,7 +101,7 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl
f"Additional information requested: {plan.human_clarification_request}"
)

track_event(
track_event_if_configured(
"Planner - Additional information requested and added into the cosmos",
{
"session_id": message.session_id,
Expand Down Expand Up @@ -138,7 +138,7 @@ async def handle_plan_clarification(
)
)

track_event(
track_event_if_configured(
"Planner - Store HumanAgent clarification and added into the cosmos",
{
"session_id": message.session_id,
Expand All @@ -160,7 +160,7 @@ async def handle_plan_clarification(
)
logging.info("Plan updated with HumanClarification.")

track_event(
track_event_if_configured(
"Planner - Updated with HumanClarification and added into the cosmos",
{
"session_id": message.session_id,
Expand Down Expand Up @@ -254,7 +254,7 @@ class StructuredOutputPlan(BaseModel):
structured_plan = StructuredOutputPlan(**parsed_result)

if not structured_plan.steps:
track_event(
track_event_if_configured(
"Planner agent - No steps found",
{
"session_id": self._session_id,
Expand Down Expand Up @@ -282,7 +282,7 @@ class StructuredOutputPlan(BaseModel):
# Store the plan in memory
await self._memory.add_plan(plan)

track_event(
track_event_if_configured(
"Planner - Initial plan and added into the cosmos",
{
"session_id": self._session_id,
Expand All @@ -308,7 +308,7 @@ class StructuredOutputPlan(BaseModel):
human_approval_status=HumanFeedbackStatus.requested,
)
await self._memory.add_step(step)
track_event(
track_event_if_configured(
"Planner - Added planned individual step into the cosmos",
{
"plan_id": plan.id,
Expand All @@ -326,7 +326,7 @@ class StructuredOutputPlan(BaseModel):

except Exception as e:
logging.exception(f"Error in create_structured_plan: {e}")
track_event(
track_event_if_configured(
f"Planner - Error in create_structured_plan: {e} into the cosmos",
{
"session_id": self._session_id,
Expand Down
43 changes: 25 additions & 18 deletions src/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@
PlanWithSteps,
)
from utils import initialize_runtime_and_context, retrieve_all_agent_tools, rai_success
from event_utils import track_event_if_configured
from fastapi.middleware.cors import CORSMiddleware
from azure.monitor.opentelemetry import configure_azure_monitor
from azure.monitor.events.extension import track_event

configure_azure_monitor(
connection_string=os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY")
)

# Check if the Application Insights Instrumentation Key is set in the environment variables
instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY")
if instrumentation_key:
# Configure Application Insights if the Instrumentation Key is found
configure_azure_monitor(connection_string=instrumentation_key)
logging.info("Application Insights configured with the provided Instrumentation Key")
else:
# Log a warning if the Instrumentation Key is not found
logging.warning("No Application Insights Instrumentation Key found. Skipping configuration")

# Configure logging
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -113,7 +120,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request):
if not rai_success(input_task.description):
print("RAI failed")

track_event(
track_event_if_configured(
"RAI failed",
{
"status": "Plan not created",
Expand All @@ -129,7 +136,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request):
user_id = authenticated_user["user_principal_id"]

if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})

raise HTTPException(status_code=400, detail="no user")
if not input_task.session_id:
Expand All @@ -150,7 +157,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request):
logging.info(f"Plan created: {plan.summary}")

# Log custom event for successful input task processing
track_event(
track_event_if_configured(
"InputTaskProcessed",
{
"status": f"Plan created:\n {plan.summary}"
Expand Down Expand Up @@ -231,7 +238,7 @@ async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Reques
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")
# Initialize runtime and context
runtime, _ = await initialize_runtime_and_context(
Expand All @@ -242,7 +249,7 @@ async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Reques
human_agent_id = AgentId("human_agent", human_feedback.session_id)
await runtime.send_message(human_feedback, human_agent_id)

track_event(
track_event_if_configured(
"Completed Feedback received",
{
"status": "Feedback received",
Expand Down Expand Up @@ -308,7 +315,7 @@ async def human_clarification_endpoint(
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")
# Initialize runtime and context
runtime, _ = await initialize_runtime_and_context(
Expand All @@ -319,7 +326,7 @@ async def human_clarification_endpoint(
planner_agent_id = AgentId("planner_agent", human_clarification.session_id)
await runtime.send_message(human_clarification, planner_agent_id)

track_event(
track_event_if_configured(
"Completed Human clarification on the plan",
{
"status": "Clarification received",
Expand Down Expand Up @@ -390,7 +397,7 @@ async def approve_step_endpoint(
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")
# Initialize runtime and context
runtime, _ = await initialize_runtime_and_context(user_id=user_id)
Expand All @@ -405,7 +412,7 @@ async def approve_step_endpoint(
)
# Return a status message
if human_feedback.step_id:
track_event(
track_event_if_configured(
"Completed Human clarification with step_id",
{
"status": f"Step {human_feedback.step_id} - Approval:{human_feedback.approved}."
Expand All @@ -416,7 +423,7 @@ async def approve_step_endpoint(
"status": f"Step {human_feedback.step_id} - Approval:{human_feedback.approved}."
}
else:
track_event(
track_event_if_configured(
"Completed Human clarification without step_id",
{"status": "All steps approved"},
)
Expand Down Expand Up @@ -488,15 +495,15 @@ async def get_plans(
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")

cosmos = CosmosBufferedChatCompletionContext(session_id or "", user_id)

if session_id:
plan = await cosmos.get_plan_by_session(session_id=session_id)
if not plan:
track_event(
track_event_if_configured(
"GetPlanBySessionNotFound",
{"status_code": 400, "detail": "Plan not found"},
)
Expand Down Expand Up @@ -576,7 +583,7 @@ async def get_steps_by_plan(plan_id: str, request: Request) -> List[Step]:
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")
cosmos = CosmosBufferedChatCompletionContext("", user_id)
steps = await cosmos.get_steps_by_plan(plan_id=plan_id)
Expand Down Expand Up @@ -634,7 +641,7 @@ async def get_agent_messages(session_id: str, request: Request) -> List[AgentMes
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
user_id = authenticated_user["user_principal_id"]
if not user_id:
track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"})
track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"})
raise HTTPException(status_code=400, detail="no user")
cosmos = CosmosBufferedChatCompletionContext(session_id, user_id)
agent_messages = await cosmos.get_data_by_type("agent_message")
Expand Down
11 changes: 11 additions & 0 deletions src/backend/event_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging
import os
from azure.monitor.events.extension import track_event


def track_event_if_configured(event_name: str, event_data: dict):
instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY")
if instrumentation_key:
track_event(event_name, event_data)
else:
logging.warning(f"Skipping track_event for {event_name} as Application Insights is not configured")
Loading