diff --git a/src/backend/agents/base_agent.py b/src/backend/agents/base_agent.py index a8c90439d..23541f83c 100644 --- a/src/backend/agents/base_agent.py +++ b/src/backend/agents/base_agent.py @@ -21,7 +21,7 @@ Step, StepStatus, ) -from azure.monitor.events.extension import track_event +from event_utils import track_event_if_configured class BaseAgent(RoutedAgent): @@ -105,7 +105,7 @@ async def handle_action_request( ) ) - track_event( + track_event_if_configured( "Base agent - Added into the cosmos", { "session_id": message.session_id, @@ -119,7 +119,7 @@ async def handle_action_request( except Exception as e: logging.exception(f"Error during LLM call: {e}") - track_event( + track_event_if_configured( "Base agent - Error during llm call, captured into the cosmos", { "session_id": message.session_id, @@ -138,7 +138,7 @@ async def handle_action_request( step.agent_reply = result await self._model_context.update_step(step) - track_event( + track_event_if_configured( "Base agent - Updated step and updated into the cosmos", { "status": StepStatus.completed, diff --git a/src/backend/agents/group_chat_manager.py b/src/backend/agents/group_chat_manager.py index e4a36aab5..101b643f1 100644 --- a/src/backend/agents/group_chat_manager.py +++ b/src/backend/agents/group_chat_manager.py @@ -22,7 +22,7 @@ StepStatus, ) -from azure.monitor.events.extension import track_event +from event_utils import track_event_if_configured @default_subscription @@ -62,7 +62,7 @@ async def handle_input_task( ) ) - track_event( + track_event_if_configured( "Group Chat Manager - Received and added input task into the cosmos", { "session_id": message.session_id, @@ -164,7 +164,7 @@ class Step(BaseDataModel): step.status = StepStatus.rejected step.human_approval_status = HumanFeedbackStatus.rejected self._memory.update_step(step) - track_event( + track_event_if_configured( "Group Chat Manager - Steps has been rejected and updated into the cosmos", { "status": StepStatus.rejected, @@ -188,7 +188,7 @@ class Step(BaseDataModel): step.status = StepStatus.rejected step.human_approval_status = HumanFeedbackStatus.rejected self._memory.update_step(step) - track_event( + track_event_if_configured( "Group Chat Manager - Step has been rejected and updated into the cosmos", { "status": StepStatus.rejected, @@ -213,7 +213,7 @@ async def _update_step_status( step.human_feedback = received_human_feedback step.status = StepStatus.completed await self._memory.update_step(step) - track_event( + track_event_if_configured( "Group Chat Manager - Received human feedback, Updating step and updated into the cosmos", { "status": StepStatus.completed, @@ -241,7 +241,7 @@ async def _execute_step(self, session_id: str, step: Step): # Update step status to 'action_requested' step.status = StepStatus.action_requested await self._memory.update_step(step) - track_event( + track_event_if_configured( "Group Chat Manager - Update step to action_requested and updated into the cosmos", { "status": StepStatus.action_requested, @@ -304,7 +304,7 @@ async def _execute_step(self, session_id: str, step: Step): ) ) - track_event( + track_event_if_configured( f"Group Chat Manager - Requesting {step.agent.value.title()} to perform the action and added into the cosmos", { "session_id": session_id, @@ -338,7 +338,7 @@ async def _execute_step(self, session_id: str, step: Step): logging.info( "Marking the step as complete - Since we have received the human feedback" ) - track_event( + track_event_if_configured( "Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos", { "session_id": session_id, diff --git a/src/backend/agents/human.py b/src/backend/agents/human.py index c65b4bce5..6292fef7e 100644 --- a/src/backend/agents/human.py +++ b/src/backend/agents/human.py @@ -12,7 +12,7 @@ AgentMessage, Step, ) -from azure.monitor.events.extension import track_event +from event_utils import track_event_if_configured @default_subscription @@ -57,7 +57,7 @@ async def handle_step_feedback( ) ) logging.info(f"HumanAgent received feedback for step: {step}") - track_event( + track_event_if_configured( f"Human Agent - Received feedback for step: {step} and added into the cosmos", { "session_id": message.session_id, @@ -81,7 +81,7 @@ async def handle_step_feedback( ) logging.info(f"HumanAgent sent approval request for step: {step}") - track_event( + track_event_if_configured( f"Human Agent - Approval request sent for step {step} and added into the cosmos", { "session_id": message.session_id, diff --git a/src/backend/agents/planner.py b/src/backend/agents/planner.py index ae8bf2601..837684434 100644 --- a/src/backend/agents/planner.py +++ b/src/backend/agents/planner.py @@ -26,7 +26,7 @@ HumanFeedbackStatus, ) -from azure.monitor.events.extension import track_event +from event_utils import track_event_if_configured @default_subscription @@ -74,7 +74,7 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl ) logging.info(f"Plan generated: {plan.summary}") - track_event( + track_event_if_configured( f"Planner - Generated a plan with {len(steps)} steps and added plan into the cosmos", { "session_id": message.session_id, @@ -101,7 +101,7 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl f"Additional information requested: {plan.human_clarification_request}" ) - track_event( + track_event_if_configured( "Planner - Additional information requested and added into the cosmos", { "session_id": message.session_id, @@ -138,7 +138,7 @@ async def handle_plan_clarification( ) ) - track_event( + track_event_if_configured( "Planner - Store HumanAgent clarification and added into the cosmos", { "session_id": message.session_id, @@ -160,7 +160,7 @@ async def handle_plan_clarification( ) logging.info("Plan updated with HumanClarification.") - track_event( + track_event_if_configured( "Planner - Updated with HumanClarification and added into the cosmos", { "session_id": message.session_id, @@ -254,7 +254,7 @@ class StructuredOutputPlan(BaseModel): structured_plan = StructuredOutputPlan(**parsed_result) if not structured_plan.steps: - track_event( + track_event_if_configured( "Planner agent - No steps found", { "session_id": self._session_id, @@ -282,7 +282,7 @@ class StructuredOutputPlan(BaseModel): # Store the plan in memory await self._memory.add_plan(plan) - track_event( + track_event_if_configured( "Planner - Initial plan and added into the cosmos", { "session_id": self._session_id, @@ -308,7 +308,7 @@ class StructuredOutputPlan(BaseModel): human_approval_status=HumanFeedbackStatus.requested, ) await self._memory.add_step(step) - track_event( + track_event_if_configured( "Planner - Added planned individual step into the cosmos", { "plan_id": plan.id, @@ -326,7 +326,7 @@ class StructuredOutputPlan(BaseModel): except Exception as e: logging.exception(f"Error in create_structured_plan: {e}") - track_event( + track_event_if_configured( f"Planner - Error in create_structured_plan: {e} into the cosmos", { "session_id": self._session_id, diff --git a/src/backend/app.py b/src/backend/app.py index eea1e5d02..bcde79b2b 100644 --- a/src/backend/app.py +++ b/src/backend/app.py @@ -20,13 +20,20 @@ PlanWithSteps, ) from utils import initialize_runtime_and_context, retrieve_all_agent_tools, rai_success +from event_utils import track_event_if_configured from fastapi.middleware.cors import CORSMiddleware from azure.monitor.opentelemetry import configure_azure_monitor -from azure.monitor.events.extension import track_event -configure_azure_monitor( - connection_string=os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY") -) + +# Check if the Application Insights Instrumentation Key is set in the environment variables +instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY") +if instrumentation_key: + # Configure Application Insights if the Instrumentation Key is found + configure_azure_monitor(connection_string=instrumentation_key) + logging.info("Application Insights configured with the provided Instrumentation Key") +else: + # Log a warning if the Instrumentation Key is not found + logging.warning("No Application Insights Instrumentation Key found. Skipping configuration") # Configure logging logging.basicConfig(level=logging.INFO) @@ -113,7 +120,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request): if not rai_success(input_task.description): print("RAI failed") - track_event( + track_event_if_configured( "RAI failed", { "status": "Plan not created", @@ -129,7 +136,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request): user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") if not input_task.session_id: @@ -150,7 +157,7 @@ async def input_task_endpoint(input_task: InputTask, request: Request): logging.info(f"Plan created: {plan.summary}") # Log custom event for successful input task processing - track_event( + track_event_if_configured( "InputTaskProcessed", { "status": f"Plan created:\n {plan.summary}" @@ -231,7 +238,7 @@ async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Reques authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") # Initialize runtime and context runtime, _ = await initialize_runtime_and_context( @@ -242,7 +249,7 @@ async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Reques human_agent_id = AgentId("human_agent", human_feedback.session_id) await runtime.send_message(human_feedback, human_agent_id) - track_event( + track_event_if_configured( "Completed Feedback received", { "status": "Feedback received", @@ -308,7 +315,7 @@ async def human_clarification_endpoint( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") # Initialize runtime and context runtime, _ = await initialize_runtime_and_context( @@ -319,7 +326,7 @@ async def human_clarification_endpoint( planner_agent_id = AgentId("planner_agent", human_clarification.session_id) await runtime.send_message(human_clarification, planner_agent_id) - track_event( + track_event_if_configured( "Completed Human clarification on the plan", { "status": "Clarification received", @@ -390,7 +397,7 @@ async def approve_step_endpoint( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") # Initialize runtime and context runtime, _ = await initialize_runtime_and_context(user_id=user_id) @@ -405,7 +412,7 @@ async def approve_step_endpoint( ) # Return a status message if human_feedback.step_id: - track_event( + track_event_if_configured( "Completed Human clarification with step_id", { "status": f"Step {human_feedback.step_id} - Approval:{human_feedback.approved}." @@ -416,7 +423,7 @@ async def approve_step_endpoint( "status": f"Step {human_feedback.step_id} - Approval:{human_feedback.approved}." } else: - track_event( + track_event_if_configured( "Completed Human clarification without step_id", {"status": "All steps approved"}, ) @@ -488,7 +495,7 @@ async def get_plans( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") cosmos = CosmosBufferedChatCompletionContext(session_id or "", user_id) @@ -496,7 +503,7 @@ async def get_plans( if session_id: plan = await cosmos.get_plan_by_session(session_id=session_id) if not plan: - track_event( + track_event_if_configured( "GetPlanBySessionNotFound", {"status_code": 400, "detail": "Plan not found"}, ) @@ -576,7 +583,7 @@ async def get_steps_by_plan(plan_id: str, request: Request) -> List[Step]: authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") cosmos = CosmosBufferedChatCompletionContext("", user_id) steps = await cosmos.get_steps_by_plan(plan_id=plan_id) @@ -634,7 +641,7 @@ async def get_agent_messages(session_id: str, request: Request) -> List[AgentMes authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) raise HTTPException(status_code=400, detail="no user") cosmos = CosmosBufferedChatCompletionContext(session_id, user_id) agent_messages = await cosmos.get_data_by_type("agent_message") diff --git a/src/backend/event_utils.py b/src/backend/event_utils.py new file mode 100644 index 000000000..9b9e5bbf0 --- /dev/null +++ b/src/backend/event_utils.py @@ -0,0 +1,11 @@ +import logging +import os +from azure.monitor.events.extension import track_event + + +def track_event_if_configured(event_name: str, event_data: dict): + instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY") + if instrumentation_key: + track_event(event_name, event_data) + else: + logging.warning(f"Skipping track_event for {event_name} as Application Insights is not configured")