diff --git a/src/backend/tests/agents/test_agentutils.py b/src/backend/tests/agents/test_agentutils.py index c5131815f..43cef13c3 100644 --- a/src/backend/tests/agents/test_agentutils.py +++ b/src/backend/tests/agents/test_agentutils.py @@ -1,13 +1,14 @@ -# pylint: disable=import-error, wrong-import-position, missing-module-docstring import os import sys -from unittest.mock import MagicMock +import json import pytest -from pydantic import ValidationError +from unittest.mock import MagicMock, patch +from pydantic import BaseModel -# Environment and module setup -sys.modules["azure.monitor.events.extension"] = MagicMock() +# Adjust sys.path so that the project root is found. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) +# Set required environment variables. os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -16,39 +17,163 @@ os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" -from src.backend.agents.agentutils import extract_and_update_transition_states # noqa: F401, C0413 -from src.backend.models.messages import Step # noqa: F401, C0413 +# Patch missing azure module so that event_utils imports without error. +sys.modules["azure.monitor.events.extension"] = MagicMock() + +# --- Import the function and constant under test --- +from src.backend.agents.agentutils import ( + extract_and_update_transition_states, + common_agent_system_message, +) +from src.backend.models.messages import Step +from autogen_core.components.models import AzureOpenAIChatCompletionClient + +# Configure the Step model to allow extra attributes. +Step.model_config["extra"] = "allow" + + +# Dummy Cosmos class that records update calls. +class DummyCosmosRecorder: + def __init__(self): + self.update_called = False + + async def update_step(self, step): + # To allow setting extra attributes, ensure __pydantic_extra__ is initialized. + if step.__pydantic_extra__ is None: + step.__pydantic_extra__ = {} + step.__pydantic_extra__["updated_field"] = True + self.update_called = True + + +# Dummy model client classes to simulate LLM responses. + +class DummyModelClient(AzureOpenAIChatCompletionClient): + def __init__(self, **kwargs): + # Bypass parent's __init__. + pass + + async def create(self, messages, extra_create_args=None): + # Simulate a valid response that matches the expected FSMStateAndTransition schema. + response_dict = { + "identifiedTargetState": "State1", + "identifiedTargetTransition": "Transition1" + } + dummy_resp = MagicMock() + dummy_resp.content = json.dumps(response_dict) + return dummy_resp + + +class DummyModelClientError(AzureOpenAIChatCompletionClient): + def __init__(self, **kwargs): + pass + + async def create(self, messages, extra_create_args=None): + raise Exception("LLM error") + +class DummyModelClientInvalidJSON(AzureOpenAIChatCompletionClient): + def __init__(self, **kwargs): + pass -def test_step_initialization(): - """Test Step initialization with valid data.""" + async def create(self, messages, extra_create_args=None): + dummy_resp = MagicMock() + dummy_resp.content = "invalid json" + return dummy_resp + + +# Fixture: a dummy Step for testing. +@pytest.fixture +def dummy_step(): step = Step( - data_type="step", - plan_id="test_plan", - action="test_action", - agent="HumanAgent", - session_id="test_session", - user_id="test_user", - agent_reply="test_reply", + id="step1", + plan_id="plan1", + action="Test Action", + agent="HumanAgent", # Using string for simplicity. + status="planned", + session_id="sess1", + user_id="user1", + human_approval_status="requested", ) + # Provide a value for agent_reply. + step.agent_reply = "Test reply" + # Ensure __pydantic_extra__ is initialized for extra fields. + step.__pydantic_extra__ = {} + return step + + +# Tests for extract_and_update_transition_states +@pytest.mark.asyncio +async def test_extract_and_update_transition_states_success(dummy_step): + """ + Test that extract_and_update_transition_states correctly parses the LLM response, + updates the step with the expected target state and transition, and calls cosmos.update_step. + """ + model_client = DummyModelClient() + dummy_cosmos = DummyCosmosRecorder() + with patch("src.backend.agents.agentutils.CosmosBufferedChatCompletionContext", return_value=dummy_cosmos): + updated_step = await extract_and_update_transition_states(dummy_step, "sess1", "user1", "anything", model_client) + assert updated_step.identified_target_state == "State1" + assert updated_step.identified_target_transition == "Transition1" + assert dummy_cosmos.update_called is True + # Check that our extra field was set. + assert updated_step.__pydantic_extra__.get("updated_field") is True + + +@pytest.mark.asyncio +async def test_extract_and_update_transition_states_model_client_error(dummy_step): + """ + Test that if the model client raises an exception, it propagates. + """ + model_client = DummyModelClientError() + with patch("src.backend.agents.agentutils.CosmosBufferedChatCompletionContext", return_value=DummyCosmosRecorder()): + with pytest.raises(Exception, match="LLM error"): + await extract_and_update_transition_states(dummy_step, "sess1", "user1", "anything", model_client) + + +@pytest.mark.asyncio +async def test_extract_and_update_transition_states_invalid_json(dummy_step): + """ + Test that an invalid JSON response from the model client causes an exception. + """ + model_client = DummyModelClientInvalidJSON() + with patch("src.backend.agents.agentutils.CosmosBufferedChatCompletionContext", return_value=DummyCosmosRecorder()): + with pytest.raises(Exception): + await extract_and_update_transition_states(dummy_step, "sess1", "user1", "anything", model_client) + + +def test_common_agent_system_message_contains_delivery_address(): + """ + Test that the common_agent_system_message constant contains instructions regarding the delivery address. + """ + assert "delivery address" in common_agent_system_message + + +@pytest.mark.asyncio +async def test_extract_and_update_transition_states_no_agent_reply(dummy_step): + """ + Test the behavior when step.agent_reply is empty. + """ + dummy_step.agent_reply = "" + # Ensure extra dict is initialized. + dummy_step.__pydantic_extra__ = {} + model_client = DummyModelClient() + with patch("src.backend.agents.agentutils.CosmosBufferedChatCompletionContext", return_value=DummyCosmosRecorder()): + updated_step = await extract_and_update_transition_states(dummy_step, "sess1", "user1", "anything", model_client) + # Even with an empty agent_reply, our dummy client returns the same valid JSON. + assert updated_step.identified_target_state == "State1" + assert updated_step.identified_target_transition == "Transition1" + + +def test_dummy_json_parsing(): + """ + Test that the JSON parsing in extract_and_update_transition_states works for valid JSON. + """ + json_str = '{"identifiedTargetState": "TestState", "identifiedTargetTransition": "TestTransition"}' + data = json.loads(json_str) - assert step.data_type == "step" - assert step.plan_id == "test_plan" - assert step.action == "test_action" - assert step.agent == "HumanAgent" - assert step.session_id == "test_session" - assert step.user_id == "test_user" - assert step.agent_reply == "test_reply" - assert step.status == "planned" - assert step.human_approval_status == "requested" - - -def test_step_missing_required_fields(): - """Test Step initialization with missing required fields.""" - with pytest.raises(ValidationError): - Step( - data_type="step", - action="test_action", - agent="test_agent", - session_id="test_session", - ) + class DummySchema(BaseModel): + identifiedTargetState: str + identifiedTargetTransition: str + schema = DummySchema(**data) + assert schema.identifiedTargetState == "TestState" + assert schema.identifiedTargetTransition == "TestTransition" diff --git a/src/backend/tests/agents/test_group_chat_manager.py b/src/backend/tests/agents/test_group_chat_manager.py index 60c775d2d..7d474be96 100644 --- a/src/backend/tests/agents/test_group_chat_manager.py +++ b/src/backend/tests/agents/test_group_chat_manager.py @@ -1,13 +1,12 @@ -""" -Combined Test cases for GroupChatManager class in the backend agents module. -""" - import os import sys -from unittest.mock import AsyncMock, patch, MagicMock import pytest +from unittest.mock import AsyncMock, MagicMock + +# Adjust sys.path so that the project root is found. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) -# Set mock environment variables for Azure and CosmosDB before importing anything else +# Set required environment variables. os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -16,113 +15,348 @@ os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" -# Mock Azure dependencies +# Patch missing azure module so that event_utils imports without error. sys.modules["azure.monitor.events.extension"] = MagicMock() -# Import after setting environment variables + +from autogen_core.base._agent_instantiation import AgentInstantiationContext + + +@pytest.fixture(autouse=True) +def dummy_agent_instantiation_context(): + token = AgentInstantiationContext.AGENT_INSTANTIATION_CONTEXT_VAR.set(("dummy_runtime", "dummy_agent_id")) + yield + AgentInstantiationContext.AGENT_INSTANTIATION_CONTEXT_VAR.reset(token) + + +# --- Import production classes --- from src.backend.agents.group_chat_manager import GroupChatManager from src.backend.models.messages import ( + AgentMessage, + HumanFeedback, + InputTask, + Plan, + PlanStatus, Step, StepStatus, + HumanFeedbackStatus, BAgentType, ) -from autogen_core.base import AgentInstantiationContext, AgentRuntime -from autogen_core.components.models import AzureOpenAIChatCompletionClient -from src.backend.context.cosmos_memory import CosmosBufferedChatCompletionContext -from autogen_core.base import AgentId +from autogen_core.base import AgentId, MessageContext -@pytest.fixture -def setup_group_chat_manager(): - """ - Fixture to set up a GroupChatManager and its dependencies. - """ - # Mock dependencies - mock_model_client = MagicMock(spec=AzureOpenAIChatCompletionClient) - session_id = "test_session_id" - user_id = "test_user_id" - mock_memory = AsyncMock(spec=CosmosBufferedChatCompletionContext) - mock_agent_ids = {BAgentType.planner_agent: AgentId("planner_agent", session_id)} - - # Mock AgentInstantiationContext - mock_runtime = MagicMock(spec=AgentRuntime) - mock_agent_id = "test_agent_id" - - with patch.object(AgentInstantiationContext, "current_runtime", return_value=mock_runtime): - with patch.object(AgentInstantiationContext, "current_agent_id", return_value=mock_agent_id): - # Instantiate GroupChatManager - group_chat_manager = GroupChatManager( - model_client=mock_model_client, - session_id=session_id, - user_id=user_id, - memory=mock_memory, - agent_ids=mock_agent_ids, - ) - - return group_chat_manager, mock_memory, session_id, user_id, mock_agent_ids +# --- Define a DummyMessageContext that supplies required parameters --- +class DummyMessageContext(MessageContext): + def __init__(self): + super().__init__(sender="dummy_sender", topic_id="dummy_topic", is_rpc=False, cancellation_token=None) -@pytest.mark.asyncio -@patch("src.backend.agents.group_chat_manager.track_event_if_configured") -async def test_update_step_status(mock_track_event, setup_group_chat_manager): - """ - Test the `_update_step_status` method. - """ - group_chat_manager, mock_memory, session_id, user_id, mock_agent_ids = setup_group_chat_manager - - # Create a mock Step - step = Step( - id="test_step_id", +# --- Fake Memory implementation --- +class FakeMemory: + def __init__(self): + self.added_items = [] + self.updated_steps = [] + + async def add_item(self, item: AgentMessage): + self.added_items.append(item) + + async def update_step(self, step: Step): + self.updated_steps.append(step) + + async def get_plan_by_session(self, session_id: str) -> Plan: + return Plan.model_construct( + id="plan1", + session_id=session_id, + user_id="user1", + initial_goal="Test goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="Plan feedback", + ) + + async def get_steps_by_plan(self, plan_id: str) -> list: + step1 = Step.model_construct( + id="step1", + plan_id=plan_id, + action="Action 1", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, + ) + step2 = Step.model_construct( + id="step2", + plan_id=plan_id, + action="Action 2", + agent=BAgentType.tech_support_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="Existing feedback", + human_approval_status=HumanFeedbackStatus.requested, + ) + return [step1, step2] + + async def add_plan(self, plan: Plan): + pass + + async def update_plan(self, plan: Plan): + pass + + +# --- Fake send_message for GroupChatManager --- +async def fake_send_message(message, agent_id): + return Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Test goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="", + ) + + +# --- Fixture to create a GroupChatManager instance --- +@pytest.fixture +def group_chat_manager(): + mock_model_client = MagicMock() + session_id = "sess1" + user_id = "user1" + fake_memory = FakeMemory() + # Create a dummy agent_ids dictionary with valid enum values. + agent_ids = { + BAgentType.planner_agent: AgentId("planner_agent", session_id), + BAgentType.human_agent: AgentId("human_agent", session_id), + BAgentType.tech_support_agent: AgentId("tech_support_agent", session_id), + } + manager = GroupChatManager( + model_client=mock_model_client, session_id=session_id, - plan_id="test_plan_id", user_id=user_id, - action="Test Action", + memory=fake_memory, + agent_ids=agent_ids, + ) + manager.send_message = AsyncMock(side_effect=fake_send_message) + return manager, fake_memory + + +# --- To simulate a missing agent in a step, define a dummy subclass --- +class DummyStepMissingAgent(Step): + @property + def agent(self): + return "" + + +# ---------------------- Tests ---------------------- + +@pytest.mark.asyncio +async def test_handle_input_task(group_chat_manager): + manager, fake_memory = group_chat_manager + # Use production InputTask via model_construct. + input_task = InputTask.model_construct(description="Test input description", session_id="sess1") + ctx = DummyMessageContext() + plan = await manager.handle_input_task(input_task, ctx) + # Verify an AgentMessage was added with the input description. + assert any("Test input description" in item.content for item in fake_memory.added_items) + assert plan.id == "plan1" + + +@pytest.mark.asyncio +async def test_handle_human_approval_feedback_specific_step(group_chat_manager): + manager, fake_memory = group_chat_manager + feedback = HumanFeedback.model_construct(session_id="sess1", plan_id="plan1", step_id="step1", approved=True, human_clarification="Approved") + step = Step.model_construct( + id="step1", + plan_id="plan1", + action="Action for step1", agent=BAgentType.human_agent, status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, ) + fake_memory.get_steps_by_plan = AsyncMock(return_value=[step]) + fake_memory.get_plan_by_session = AsyncMock(return_value=Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="Plan feedback", + )) + manager._update_step_status = AsyncMock() + manager._execute_step = AsyncMock() + await manager.handle_human_approval_feedback(feedback, DummyMessageContext()) + manager._update_step_status.assert_called_once() + manager._execute_step.assert_called_once_with("sess1", step) - # Call the method - await group_chat_manager._update_step_status(step, True, "Feedback message") - - # Assertions - step.status = StepStatus.completed - step.human_feedback = "Feedback message" - mock_memory.update_step.assert_called_once_with(step) - mock_track_event.assert_called_once_with( - "Group Chat Manager - Received human feedback, Updating step and updated into the cosmos", - { - "status": StepStatus.completed, - "session_id": step.session_id, - "user_id": step.user_id, - "human_feedback": "Feedback message", - "source": step.agent, - }, + +@pytest.mark.asyncio +async def test_handle_human_approval_feedback_all_steps(group_chat_manager): + manager, fake_memory = group_chat_manager + feedback = HumanFeedback.model_construct(session_id="sess1", plan_id="plan1", step_id="", approved=False, human_clarification="Rejected") + step1 = Step.model_construct( + id="step1", + plan_id="plan1", + action="Action 1", + agent=BAgentType.tech_support_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, ) + step2 = Step.model_construct( + id="step2", + plan_id="plan1", + action="Action 2", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="Existing", + human_approval_status=HumanFeedbackStatus.requested, + ) + fake_memory.get_steps_by_plan = AsyncMock(return_value=[step1, step2]) + fake_memory.get_plan_by_session = AsyncMock(return_value=Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="", + )) + manager._update_step_status = AsyncMock() + manager._execute_step = AsyncMock() + await manager.handle_human_approval_feedback(feedback, DummyMessageContext()) + # Expect _update_step_status to be called for each step + assert manager._update_step_status.call_count == 2 + manager._execute_step.assert_not_called() @pytest.mark.asyncio -async def test_update_step_invalid_feedback_status(setup_group_chat_manager): - """ - Test `_update_step_status` with invalid feedback status. - Covers lines 210-211. - """ - group_chat_manager, mock_memory, session_id, user_id, mock_agent_ids = setup_group_chat_manager - - # Create a mock Step - step = Step( - id="test_step_id", - session_id=session_id, - plan_id="test_plan_id", - user_id=user_id, - action="Test Action", +async def test_update_step_status(group_chat_manager): + manager, fake_memory = group_chat_manager + step = Step.model_construct( + id="step_update", + plan_id="plan1", + action="Test action", agent=BAgentType.human_agent, status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, ) + fake_memory.update_step = AsyncMock() + await manager._update_step_status(step, True, "Positive feedback") + assert step.status == StepStatus.completed + assert step.human_feedback == "Positive feedback" + fake_memory.update_step.assert_called_once_with(step) - # Call the method with invalid feedback status - await group_chat_manager._update_step_status(step, None, "Feedback message") - # Assertions - step.status = StepStatus.planned # Status should remain unchanged - step.human_feedback = "Feedback message" - mock_memory.update_step.assert_called_once_with(step) +@pytest.mark.asyncio +async def test_execute_step_non_human(group_chat_manager): + manager, fake_memory = group_chat_manager + step = Step.model_construct( + id="step_nonhuman", + plan_id="plan1", + action="Perform diagnostic", + agent=BAgentType.tech_support_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, + ) + fake_memory.update_step = AsyncMock() + manager.send_message = AsyncMock(return_value=Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="", + )) + fake_memory.get_plan_by_session = AsyncMock(return_value=Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="", + )) + fake_memory.get_steps_by_plan = AsyncMock(return_value=[step]) + await manager._execute_step("sess1", step) + fake_memory.update_step.assert_called() + manager.send_message.assert_called_once() + + +@pytest.mark.asyncio +async def test_execute_step_human_agent(group_chat_manager): + manager, fake_memory = group_chat_manager + step = Step.model_construct( + id="step_human", + plan_id="plan1", + action="Verify details", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, + ) + fake_memory.update_step = AsyncMock() + manager.send_message = AsyncMock() + fake_memory.get_plan_by_session = AsyncMock(return_value=Plan.model_construct( + id="plan1", + session_id="sess1", + user_id="user1", + initial_goal="Goal", + overall_status=PlanStatus.in_progress, + source="GroupChatManager", + summary="Test summary", + human_clarification_response="", + )) + fake_memory.get_steps_by_plan = AsyncMock(return_value=[step]) + await manager._execute_step("sess1", step) + # For human agent, _execute_step should mark the step as complete and not call send_message. + assert step.status == StepStatus.completed + manager.send_message.assert_not_called() + + +# --- Test for missing agent error in _execute_step --- +@pytest.mark.asyncio +async def test_execute_step_missing_agent_raises(group_chat_manager): + manager, fake_memory = group_chat_manager + + # Create a dummy step using a subclass that forces agent to be an empty string. + class DummyStepMissingAgent(Step): + @property + def agent(self): + return "" + DummyStepMissingAgent.model_construct( + id="step_missing", + plan_id="plan1", + action="Do something", + agent=BAgentType.human_agent, # initial value (will be overridden by the property) + status=StepStatus.planned, + session_id="sess1", + user_id="user1", + human_feedback="", + human_approval_status=HumanFeedbackStatus.requested, + ) diff --git a/src/backend/tests/agents/test_human.py b/src/backend/tests/agents/test_human.py index eb11e568d..49f256196 100644 --- a/src/backend/tests/agents/test_human.py +++ b/src/backend/tests/agents/test_human.py @@ -1,121 +1,208 @@ -""" -Test cases for HumanAgent class in the backend agents module. -""" - -# Standard library imports +# src/backend/tests/agents/test_human.py import os import sys from unittest.mock import AsyncMock, MagicMock, patch import pytest +# Adjust sys.path so that the project root is found. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) -# Function to set environment variables -def setup_environment_variables(): - """Set environment variables required for the tests.""" - os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" - os.environ["COSMOSDB_KEY"] = "mock-key" - os.environ["COSMOSDB_DATABASE"] = "mock-database" - os.environ["COSMOSDB_CONTAINER"] = "mock-container" - os.environ["APPLICATIONINSIGHTS_INSTRUMENTATION_KEY"] = "mock-instrumentation-key" - os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" - os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" - os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" +# Set required environment variables. +os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" +os.environ["COSMOSDB_KEY"] = "mock-key" +os.environ["COSMOSDB_DATABASE"] = "mock-database" +os.environ["COSMOSDB_CONTAINER"] = "mock-container" +os.environ["APPLICATIONINSIGHTS_INSTRUMENTATION_KEY"] = "mock-instrumentation-key" +os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" +os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" +os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" +# Patch azure module so that event_utils imports correctly. +sys.modules["azure.monitor.events.extension"] = MagicMock() -# Call the function to set environment variables -setup_environment_variables() +from autogen_core.base._agent_instantiation import AgentInstantiationContext +dummy_runtime = MagicMock() +dummy_agent_id = "dummy_agent_id" -# Mock Azure and event_utils dependencies globally -sys.modules["azure.monitor.events.extension"] = MagicMock() -sys.modules["src.backend.event_utils"] = MagicMock() -# Project-specific imports (must come after environment setup) -from autogen_core.base import AgentInstantiationContext, AgentRuntime -from src.backend.agents.human import HumanAgent -from src.backend.models.messages import HumanFeedback, Step, StepStatus, BAgentType +@pytest.fixture(autouse=True) +def patch_instantiation_context(monkeypatch): + monkeypatch.setattr(AgentInstantiationContext, "current_runtime", lambda: dummy_runtime) + monkeypatch.setattr(AgentInstantiationContext, "current_agent_id", lambda: dummy_agent_id) + + +# --- Patch ApprovalRequest so that required fields get default values --- +from src.backend.models.messages import ApprovalRequest as RealApprovalRequest, Plan + + +class DummyApprovalRequest(RealApprovalRequest): + def __init__(self, **data): + # Provide default values for missing fields. + data.setdefault("action", "dummy_action") + data.setdefault("agent", "dummy_agent") + super().__init__(**data) @pytest.fixture(autouse=True) -def ensure_env_variables(monkeypatch): - """ - Fixture to ensure environment variables are set for all tests. - This overrides any modifications made by individual tests. - """ - env_vars = { - "COSMOSDB_ENDPOINT": "https://mock-endpoint", - "COSMOSDB_KEY": "mock-key", - "COSMOSDB_DATABASE": "mock-database", - "COSMOSDB_CONTAINER": "mock-container", - "APPLICATIONINSIGHTS_INSTRUMENTATION_KEY": "mock-instrumentation-key", - "AZURE_OPENAI_DEPLOYMENT_NAME": "mock-deployment-name", - "AZURE_OPENAI_API_VERSION": "2023-01-01", - "AZURE_OPENAI_ENDPOINT": "https://mock-openai-endpoint", - } - for key, value in env_vars.items(): - monkeypatch.setenv(key, value) +def patch_approval_request(monkeypatch): + monkeypatch.setattr("src.backend.agents.human.ApprovalRequest", DummyApprovalRequest) + + +# Now import the module under test. +from autogen_core.base import MessageContext, AgentId +from src.backend.agents.human import HumanAgent +from src.backend.models.messages import HumanFeedback, Step, StepStatus, BAgentType +# Define a minimal dummy MessageContext implementation. +class DummyMessageContext(MessageContext): + def __init__(self, sender="dummy_sender", topic_id="dummy_topic", is_rpc=False, cancellation_token=None): + self.sender = sender + self.topic_id = topic_id + self.is_rpc = is_rpc + self.cancellation_token = cancellation_token + + +# Define a fake memory implementation. +class FakeMemory: + def __init__(self): + self.added_items = [] + self.updated_steps = [] + self.fake_step = None + + async def get_step(self, step_id: str, session_id: str) -> Step: + return self.fake_step # Controlled by the test + + async def update_step(self, step: Step): + self.updated_steps.append(step) + return + + async def add_item(self, item): + self.added_items.append(item) + return + + async def get_plan_by_session(self, session_id: str) -> Plan: + # Import Plan here to avoid circular import issues. + from src.backend.models.messages import Plan, PlanStatus + return Plan( + id="plan123", + session_id=session_id, + user_id="test_user", + initial_goal="Test goal", + overall_status=PlanStatus.in_progress, + source="HumanAgent", + summary="Test summary", + human_clarification_response=None, + ) + + +# Fixture to create a HumanAgent instance with fake memory. @pytest.fixture -def setup_agent(): +def human_agent(): + fake_memory = FakeMemory() + user_id = "test_user" + group_chat_manager_id = AgentId("group_chat_manager", "session123") + agent = HumanAgent(memory=fake_memory, user_id=user_id, group_chat_manager_id=group_chat_manager_id) + return agent, fake_memory + + +# ------------------- Existing Tests ------------------- +def test_human_agent_init(): + fake_memory = MagicMock() + user_id = "test_user" + group_chat_manager_id = AgentId("group_chat_manager", "session123") + agent = HumanAgent(memory=fake_memory, user_id=user_id, group_chat_manager_id=group_chat_manager_id) + assert agent.user_id == user_id + assert agent.group_chat_manager_id == group_chat_manager_id + assert agent._memory == fake_memory + + +@pytest.mark.asyncio +async def test_handle_step_feedback_no_step_found(human_agent): """ - Fixture to set up a HumanAgent and its dependencies. + Test the case where no step is found. + Expect that the method logs the "No step found" message and returns without updating. """ - memory = AsyncMock() - user_id = "test_user" - group_chat_manager_id = "group_chat_manager" - - # Mock runtime and agent ID - mock_runtime = MagicMock(spec=AgentRuntime) - mock_agent_id = "test_agent_id" - - # Set up the context - with patch.object(AgentInstantiationContext, "current_runtime", return_value=mock_runtime): - with patch.object(AgentInstantiationContext, "current_agent_id", return_value=mock_agent_id): - agent = HumanAgent(memory, user_id, group_chat_manager_id) - - session_id = "session123" - step_id = "step123" - plan_id = "plan123" - - # Mock HumanFeedback message - feedback_message = HumanFeedback( - session_id=session_id, - step_id=step_id, - plan_id=plan_id, + agent, fake_memory = human_agent + feedback = HumanFeedback( + session_id="session123", + step_id="nonexistent", + plan_id="plan123", approved=True, - human_feedback="Great job!", + human_feedback="Good job!" ) + fake_memory.get_step = AsyncMock(return_value=None) + fake_memory.update_step = AsyncMock() + fake_memory.add_item = AsyncMock() + ctx = DummyMessageContext() + with patch("src.backend.agents.human.logging.info") as mock_log: + await agent.handle_step_feedback(feedback, ctx) + mock_log.assert_called_with("No step found with id: nonexistent") + fake_memory.update_step.assert_not_called() + fake_memory.add_item.assert_not_called() + - # Mock Step with all required fields - step = Step( - plan_id=plan_id, - action="Test Action", +@pytest.mark.asyncio +async def test_handle_step_feedback_update_exception(human_agent): + """ + Test that if update_step raises an exception, the exception propagates. + """ + agent, fake_memory = human_agent + fake_step = Step( + id="step999", + plan_id="plan999", + action="Do something", agent=BAgentType.human_agent, status=StepStatus.planned, - session_id=session_id, - user_id=user_id, + session_id="session999", + user_id="test_user", human_feedback=None, + human_approval_status="requested" ) - - return agent, memory, feedback_message, step, session_id, step_id, plan_id + fake_memory.fake_step = fake_step + fake_memory.get_step = AsyncMock(return_value=fake_step) + fake_memory.update_step = AsyncMock(side_effect=Exception("Update failed")) + fake_memory.add_item = AsyncMock() + feedback = HumanFeedback( + session_id="session999", + step_id="step999", + plan_id="plan999", + approved=True, + human_feedback="Feedback" + ) + ctx = DummyMessageContext() + with pytest.raises(Exception, match="Update failed"): + await agent.handle_step_feedback(feedback, ctx) -@patch("src.backend.agents.human.logging.info") -@patch("src.backend.agents.human.track_event_if_configured") @pytest.mark.asyncio -async def test_handle_step_feedback_step_not_found(mock_track_event, mock_logging, setup_agent): +async def test_handle_step_feedback_add_item_exception(human_agent): """ - Test scenario where the step is not found in memory. + Test that if add_item (for AgentMessage) raises an exception, the exception propagates. """ - agent, memory, feedback_message, _, _, step_id, _ = setup_agent - - # Mock no step found - memory.get_step.return_value = None - - # Run the method - await agent.handle_step_feedback(feedback_message, MagicMock()) - - # Check if log and return were called correctly - mock_logging.assert_called_with(f"No step found with id: {step_id}") - memory.update_step.assert_not_called() - mock_track_event.assert_not_called() + agent, fake_memory = human_agent + fake_step = Step( + id="step888", + plan_id="plan888", + action="Test action", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="session888", + user_id="test_user", + human_feedback=None, + human_approval_status="requested" + ) + fake_memory.fake_step = fake_step + fake_memory.get_step = AsyncMock(return_value=fake_step) + fake_memory.update_step = AsyncMock() + fake_memory.add_item = AsyncMock(side_effect=Exception("AddItem failed")) + feedback = HumanFeedback( + session_id="session888", + step_id="step888", + plan_id="plan888", + approved=True, + human_feedback="Test feedback" + ) + ctx = DummyMessageContext() + with pytest.raises(Exception, match="AddItem failed"): + await agent.handle_step_feedback(feedback, ctx) diff --git a/src/backend/tests/agents/test_marketing.py b/src/backend/tests/agents/test_marketing.py index 48562bc13..08709acc0 100644 --- a/src/backend/tests/agents/test_marketing.py +++ b/src/backend/tests/agents/test_marketing.py @@ -1,51 +1,81 @@ +# src/backend/tests/agents/test_marketing.py import os import sys import pytest from unittest.mock import MagicMock -from autogen_core.components.tools import FunctionTool -# Import marketing functions for testing +# Adjust sys.path so that the project root is found. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) + +# Set required environment variables for tests. +os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" +os.environ["COSMOSDB_KEY"] = "mock-key" +os.environ["COSMOSDB_DATABASE"] = "mock-database" +os.environ["COSMOSDB_CONTAINER"] = "mock-container" +os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" +os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" +os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" + +# Patch azure module so that event_utils imports without error. +sys.modules["azure.monitor.events.extension"] = MagicMock() + +# Import the marketing functions and MarketingAgent from the module. from src.backend.agents.marketing import ( create_marketing_campaign, analyze_market_trends, - develop_brand_strategy, generate_social_media_posts, - get_marketing_tools, - manage_loyalty_program, plan_advertising_budget, conduct_customer_survey, - generate_marketing_report, perform_competitor_analysis, optimize_seo_strategy, - run_influencer_marketing_campaign, schedule_marketing_event, design_promotional_material, manage_email_marketing, track_campaign_performance, + coordinate_with_sales_team, + develop_brand_strategy, create_content_calendar, update_website_content, plan_product_launch, - handle_customer_feedback, generate_press_release, + conduct_market_research, + handle_customer_feedback, + generate_marketing_report, + manage_social_media_account, + create_video_ad, + conduct_focus_group, + update_brand_guidelines, + handle_influencer_collaboration, + analyze_customer_behavior, + manage_loyalty_program, + develop_content_strategy, + create_infographic, + schedule_webinar, + manage_online_reputation, + run_email_ab_testing, + create_podcast_episode, + manage_affiliate_program, + generate_lead_magnets, + organize_trade_show, + manage_customer_retention_program, run_ppc_campaign, - create_infographic + create_case_study, + generate_lead_nurturing_emails, + manage_crisis_communication, + create_interactive_content, + handle_media_relations, + create_testimonial_video, + manage_event_sponsorship, + optimize_conversion_funnel, + run_influencer_marketing_campaign, + analyze_website_traffic, + develop_customer_personas, + get_marketing_tools, ) -# Set mock environment variables for Azure and CosmosDB -os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" -os.environ["COSMOSDB_KEY"] = "mock-key" -os.environ["COSMOSDB_DATABASE"] = "mock-database" -os.environ["COSMOSDB_CONTAINER"] = "mock-container" -os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" -os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" -os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" - -# Mock Azure dependencies -sys.modules["azure.monitor.events.extension"] = MagicMock() +# ------------------ Tests for marketing functions ------------------ - -# Test cases @pytest.mark.asyncio async def test_create_marketing_campaign(): result = await create_marketing_campaign("Holiday Sale", "Millennials", 10000) @@ -76,70 +106,35 @@ async def test_conduct_customer_survey(): assert "Customer survey on 'Customer Satisfaction' conducted targeting 'Frequent Buyers'." in result -@pytest.mark.asyncio -async def test_generate_marketing_report(): - result = await generate_marketing_report("Winter Campaign") - assert "Marketing report generated for campaign 'Winter Campaign'." in result - - @pytest.mark.asyncio async def test_perform_competitor_analysis(): result = await perform_competitor_analysis("Competitor A") assert "Competitor analysis performed on 'Competitor A'." in result -@pytest.mark.asyncio -async def test_perform_competitor_analysis_empty_input(): - result = await perform_competitor_analysis("") - assert "Competitor analysis performed on ''." in result - - @pytest.mark.asyncio async def test_optimize_seo_strategy(): result = await optimize_seo_strategy(["keyword1", "keyword2"]) assert "SEO strategy optimized with keywords: keyword1, keyword2." in result -@pytest.mark.asyncio -async def test_optimize_seo_strategy_empty_keywords(): - result = await optimize_seo_strategy([]) - assert "SEO strategy optimized with keywords: ." in result - - @pytest.mark.asyncio async def test_schedule_marketing_event(): result = await schedule_marketing_event("Product Launch", "2025-01-30", "Main Hall") assert "Marketing event 'Product Launch' scheduled on 2025-01-30 at Main Hall." in result -@pytest.mark.asyncio -async def test_schedule_marketing_event_empty_details(): - result = await schedule_marketing_event("", "", "") - assert "Marketing event '' scheduled on at ." in result - - @pytest.mark.asyncio async def test_design_promotional_material(): result = await design_promotional_material("Spring Sale", "poster") + # Note: The function capitalizes the material_type using .capitalize() assert "Poster for campaign 'Spring Sale' designed." in result @pytest.mark.asyncio -async def test_design_promotional_material_empty_input(): - result = await design_promotional_material("", "") - assert " for campaign '' designed." in result - - -@pytest.mark.asyncio -async def test_manage_email_marketing_large_email_list(): - result = await manage_email_marketing("Holiday Offers", 100000) - assert "Email marketing managed for campaign 'Holiday Offers' targeting 100000 recipients." in result - - -@pytest.mark.asyncio -async def test_manage_email_marketing_zero_recipients(): - result = await manage_email_marketing("Holiday Offers", 0) - assert "Email marketing managed for campaign 'Holiday Offers' targeting 0 recipients." in result +async def test_manage_email_marketing(): + result = await manage_email_marketing("Holiday Offers", 5000) + assert "Email marketing managed for campaign 'Holiday Offers' targeting 5000 recipients." in result @pytest.mark.asyncio @@ -149,21 +144,21 @@ async def test_track_campaign_performance(): @pytest.mark.asyncio -async def test_track_campaign_performance_empty_name(): - result = await track_campaign_performance("") - assert "Performance of campaign '' tracked." in result +async def test_coordinate_with_sales_team(): + result = await coordinate_with_sales_team("Spring Campaign") + assert "Campaign 'Spring Campaign' coordinated with the sales team." in result @pytest.mark.asyncio -async def test_create_content_calendar(): - result = await create_content_calendar("March") - assert "Content calendar for 'March' created." in result +async def test_develop_brand_strategy(): + result = await develop_brand_strategy("MyBrand") + assert "Brand strategy developed for 'MyBrand'." in result @pytest.mark.asyncio -async def test_create_content_calendar_empty_month(): - result = await create_content_calendar("") - assert "Content calendar for '' created." in result +async def test_create_content_calendar(): + result = await create_content_calendar("March") + assert "Content calendar for 'March' created." in result @pytest.mark.asyncio @@ -172,414 +167,236 @@ async def test_update_website_content(): assert "Website content on page 'Homepage' updated." in result -@pytest.mark.asyncio -async def test_update_website_content_empty_page(): - result = await update_website_content("") - assert "Website content on page '' updated." in result - - @pytest.mark.asyncio async def test_plan_product_launch(): result = await plan_product_launch("Smartwatch", "2025-02-15") assert "Product launch for 'Smartwatch' planned on 2025-02-15." in result -@pytest.mark.asyncio -async def test_plan_product_launch_empty_input(): - result = await plan_product_launch("", "") - assert "Product launch for '' planned on ." in result - - -@pytest.mark.asyncio -async def test_handle_customer_feedback(): - result = await handle_customer_feedback("Great service!") - assert "Customer feedback handled: Great service!" in result - - -@pytest.mark.asyncio -async def test_handle_customer_feedback_empty_feedback(): - result = await handle_customer_feedback("") - assert "Customer feedback handled: " in result - - @pytest.mark.asyncio async def test_generate_press_release(): - result = await generate_press_release("Key updates for the press release.") - assert "Identify the content." in result - assert "generate a press release based on this content Key updates for the press release." in result - - -@pytest.mark.asyncio -async def test_generate_press_release_empty_content(): - result = await generate_press_release("") - assert "generate a press release based on this content " in result - - -@pytest.mark.asyncio -async def test_generate_marketing_report_empty_name(): - result = await generate_marketing_report("") - assert "Marketing report generated for campaign ''." in result - - -@pytest.mark.asyncio -async def test_run_ppc_campaign(): - result = await run_ppc_campaign("Spring PPC", 10000.00) - assert "PPC campaign 'Spring PPC' run with a budget of $10000.00." in result - - -@pytest.mark.asyncio -async def test_run_ppc_campaign_zero_budget(): - result = await run_ppc_campaign("Spring PPC", 0.00) - assert "PPC campaign 'Spring PPC' run with a budget of $0.00." in result - - -@pytest.mark.asyncio -async def test_run_ppc_campaign_large_budget(): - result = await run_ppc_campaign("Spring PPC", 1e7) - assert "PPC campaign 'Spring PPC' run with a budget of $10000000.00." in result - - -@pytest.mark.asyncio -async def test_generate_social_media_posts_no_campaign_name(): - """Test generating social media posts with no campaign name.""" - result = await generate_social_media_posts("", ["Twitter", "LinkedIn"]) - assert "Social media posts for campaign '' generated for platforms: Twitter, LinkedIn." in result - - -@pytest.mark.asyncio -async def test_plan_advertising_budget_negative_value(): - """Test planning an advertising budget with a negative value.""" - result = await plan_advertising_budget("Summer Sale", -10000) - assert "Advertising budget planned for campaign 'Summer Sale' with a total budget of $-10000.00." in result + result = await generate_press_release("Key updates for press release.") + # Check for a substring that indicates the press release is generated. + assert "generate a press release based on this content Key updates for press release." in result @pytest.mark.asyncio -async def test_conduct_customer_survey_invalid_target_group(): - """Test conducting a survey with an invalid target group.""" - result = await conduct_customer_survey("Product Feedback", None) - assert "Customer survey on 'Product Feedback' conducted targeting 'None'." in result +async def test_conduct_market_research(): + result = await conduct_market_research("Automotive") + assert "Market research conducted on 'Automotive'." in result @pytest.mark.asyncio -async def test_manage_email_marketing_boundary(): - """Test managing email marketing with boundary cases.""" - result = await manage_email_marketing("Year-End Deals", 1) - assert "Email marketing managed for campaign 'Year-End Deals' targeting 1 recipients." in result +async def test_handle_customer_feedback(): + result = await handle_customer_feedback("Excellent service!") + assert "Customer feedback handled: Excellent service!" in result @pytest.mark.asyncio -async def test_create_marketing_campaign_no_audience(): - """Test creating a marketing campaign with no specified audience.""" - result = await create_marketing_campaign("Holiday Sale", "", 10000) - assert "Marketing campaign 'Holiday Sale' created targeting '' with a budget of $10000.00." in result +async def test_generate_marketing_report(): + result = await generate_marketing_report("Winter Campaign") + assert "Marketing report generated for campaign 'Winter Campaign'." in result @pytest.mark.asyncio -async def test_analyze_market_trends_no_industry(): - """Test analyzing market trends with no specified industry.""" - result = await analyze_market_trends("") - assert "Market trends analyzed for the '' industry." in result +async def test_manage_social_media_account(): + result = await manage_social_media_account("Twitter", "BrandX") + assert "Social media account 'BrandX' on platform 'Twitter' managed." in result @pytest.mark.asyncio -async def test_generate_social_media_posts_no_platforms(): - """Test generating social media posts with no specified platforms.""" - result = await generate_social_media_posts("Black Friday", []) - assert "Social media posts for campaign 'Black Friday' generated for platforms: ." in result +async def test_create_video_ad(): + result = await create_video_ad("Ad Title", "YouTube") + assert "Video advertisement 'Ad Title' created for platform 'YouTube'." in result @pytest.mark.asyncio -async def test_plan_advertising_budget_large_budget(): - """Test planning an advertising budget with a large value.""" - result = await plan_advertising_budget("Mega Sale", 1e9) - assert "Advertising budget planned for campaign 'Mega Sale' with a total budget of $1000000000.00." in result +async def test_conduct_focus_group(): + result = await conduct_focus_group("Product Feedback", 10) + assert "Focus group study on 'Product Feedback' conducted with 10 participants." in result @pytest.mark.asyncio -async def test_conduct_customer_survey_no_target(): - """Test conducting a customer survey with no specified target group.""" - result = await conduct_customer_survey("Product Feedback", "") - assert "Customer survey on 'Product Feedback' conducted targeting ''." in result +async def test_update_brand_guidelines(): + result = await update_brand_guidelines("BrandX", "New guidelines") + assert "Brand guidelines for 'BrandX' updated." in result @pytest.mark.asyncio -async def test_schedule_marketing_event_invalid_date(): - """Test scheduling a marketing event with an invalid date.""" - result = await schedule_marketing_event("Product Launch", "invalid-date", "Main Hall") - assert "Marketing event 'Product Launch' scheduled on invalid-date at Main Hall." in result +async def test_handle_influencer_collaboration(): + result = await handle_influencer_collaboration("InfluencerY", "CampaignZ") + assert "Collaboration with influencer 'InfluencerY' for campaign 'CampaignZ' handled." in result @pytest.mark.asyncio -async def test_design_promotional_material_no_type(): - """Test designing promotional material with no specified type.""" - result = await design_promotional_material("Spring Sale", "") - assert " for campaign 'Spring Sale' designed." in result +async def test_analyze_customer_behavior(): + result = await analyze_customer_behavior("SegmentA") + assert "Customer behavior in segment 'SegmentA' analyzed." in result @pytest.mark.asyncio -async def test_manage_email_marketing_no_campaign_name(): - """Test managing email marketing with no specified campaign name.""" - result = await manage_email_marketing("", 5000) - assert "Email marketing managed for campaign '' targeting 5000 recipients." in result +async def test_manage_loyalty_program(): + result = await manage_loyalty_program("Rewards", 300) + assert "Loyalty program 'Rewards' managed with 300 members." in result @pytest.mark.asyncio -async def test_track_campaign_performance_no_data(): - """Test tracking campaign performance with no data.""" - result = await track_campaign_performance(None) - assert "Performance of campaign 'None' tracked." in result +async def test_develop_content_strategy(): + result = await develop_content_strategy("ContentPlan") + assert "Content strategy 'ContentPlan' developed." in result @pytest.mark.asyncio -async def test_update_website_content_special_characters(): - """Test updating website content with a page name containing special characters.""" - result = await update_website_content("Home!@#$%^&*()Page") - assert "Website content on page 'Home!@#$%^&*()Page' updated." in result +async def test_create_infographic(): + result = await create_infographic("Top 10 Tips") + assert "Infographic 'Top 10 Tips' created." in result @pytest.mark.asyncio -async def test_plan_product_launch_past_date(): - """Test planning a product launch with a past date.""" - result = await plan_product_launch("Old Product", "2000-01-01") - assert "Product launch for 'Old Product' planned on 2000-01-01." in result +async def test_schedule_webinar(): + result = await schedule_webinar("Webinar X", "2025-03-20", "Zoom") + assert "Webinar 'Webinar X' scheduled on 2025-03-20 via Zoom." in result @pytest.mark.asyncio -async def test_handle_customer_feedback_long_text(): - """Test handling customer feedback with a very long text.""" - feedback = "Great service!" * 1000 - result = await handle_customer_feedback(feedback) - assert f"Customer feedback handled: {feedback}" in result +async def test_manage_online_reputation(): + result = await manage_online_reputation("BrandX") + assert "Online reputation for 'BrandX' managed." in result @pytest.mark.asyncio -async def test_generate_press_release_special_characters(): - """Test generating a press release with special characters in content.""" - result = await generate_press_release("Content with special characters !@#$%^&*().") - assert "generate a press release based on this content Content with special characters !@#$%^&*()." in result +async def test_run_email_ab_testing(): + result = await run_email_ab_testing("Campaign Test") + assert "A/B testing for email campaign 'Campaign Test' run." in result @pytest.mark.asyncio -async def test_run_ppc_campaign_negative_budget(): - """Test running a PPC campaign with a negative budget.""" - result = await run_ppc_campaign("Negative Budget Campaign", -100) - assert "PPC campaign 'Negative Budget Campaign' run with a budget of $-100.00." in result +async def test_create_podcast_episode(): + result = await create_podcast_episode("Series1", "Episode 1") + assert "Podcast episode 'Episode 1' for series 'Series1' created." in result @pytest.mark.asyncio -async def test_create_marketing_campaign_no_name(): - """Test creating a marketing campaign with no name.""" - result = await create_marketing_campaign("", "Gen Z", 10000) - assert "Marketing campaign '' created targeting 'Gen Z' with a budget of $10000.00." in result +async def test_manage_affiliate_program(): + result = await manage_affiliate_program("AffiliateX", 25) + assert "Affiliate program 'AffiliateX' managed with 25 affiliates." in result @pytest.mark.asyncio -async def test_analyze_market_trends_empty_industry(): - """Test analyzing market trends with an empty industry.""" - result = await analyze_market_trends("") - assert "Market trends analyzed for the '' industry." in result +async def test_generate_lead_magnets(): + result = await generate_lead_magnets("Free Ebook") + assert "Lead magnet 'Free Ebook' generated." in result @pytest.mark.asyncio -async def test_plan_advertising_budget_no_campaign_name(): - """Test planning an advertising budget with no campaign name.""" - result = await plan_advertising_budget("", 20000) - assert "Advertising budget planned for campaign '' with a total budget of $20000.00." in result +async def test_organize_trade_show(): + result = await organize_trade_show("B12", "Tech Expo") + assert "Trade show 'Tech Expo' organized at booth number 'B12'." in result @pytest.mark.asyncio -async def test_conduct_customer_survey_no_topic(): - """Test conducting a survey with no topic.""" - result = await conduct_customer_survey("", "Frequent Buyers") - assert "Customer survey on '' conducted targeting 'Frequent Buyers'." in result +async def test_manage_customer_retention_program(): + result = await manage_customer_retention_program("Retention2025") + assert "Customer retention program 'Retention2025' managed." in result @pytest.mark.asyncio -async def test_generate_marketing_report_no_name(): - """Test generating a marketing report with no name.""" - result = await generate_marketing_report("") - assert "Marketing report generated for campaign ''." in result +async def test_run_ppc_campaign(): + result = await run_ppc_campaign("PPC Test", 5000.00) + assert "PPC campaign 'PPC Test' run with a budget of $5000.00." in result @pytest.mark.asyncio -async def test_perform_competitor_analysis_no_competitor(): - """Test performing competitor analysis with no competitor specified.""" - result = await perform_competitor_analysis("") - assert "Competitor analysis performed on ''." in result +async def test_create_case_study(): + result = await create_case_study("Case Study 1", "ClientA") + assert "Case study 'Case Study 1' for client 'ClientA' created." in result @pytest.mark.asyncio -async def test_manage_email_marketing_no_recipients(): - """Test managing email marketing with no recipients.""" - result = await manage_email_marketing("Holiday Campaign", 0) - assert "Email marketing managed for campaign 'Holiday Campaign' targeting 0 recipients." in result - - -# Include all imports and environment setup from the original file. - -# New test cases added here to improve coverage: +async def test_generate_lead_nurturing_emails(): + result = await generate_lead_nurturing_emails("NurtureSeq", 5) + assert "Lead nurturing email sequence 'NurtureSeq' generated with 5 steps." in result @pytest.mark.asyncio -async def test_create_content_calendar_no_month(): - """Test creating a content calendar with no month provided.""" - result = await create_content_calendar("") - assert "Content calendar for '' created." in result +async def test_manage_crisis_communication(): + result = await manage_crisis_communication("CrisisX") + assert "Crisis communication managed for situation 'CrisisX'." in result @pytest.mark.asyncio -async def test_schedule_marketing_event_no_location(): - """Test scheduling a marketing event with no location provided.""" - result = await schedule_marketing_event("Event Name", "2025-05-01", "") - assert "Marketing event 'Event Name' scheduled on 2025-05-01 at ." in result +async def test_create_interactive_content(): + result = await create_interactive_content("Interactive Quiz") + assert "Interactive content 'Interactive Quiz' created." in result @pytest.mark.asyncio -async def test_generate_social_media_posts_missing_platforms(): - """Test generating social media posts with missing platforms.""" - result = await generate_social_media_posts("Campaign Name", []) - assert "Social media posts for campaign 'Campaign Name' generated for platforms: ." in result +async def test_handle_media_relations(): + result = await handle_media_relations("MediaCorp") + assert "Media relations handled with 'MediaCorp'." in result @pytest.mark.asyncio -async def test_handle_customer_feedback_no_text(): - """Test handling customer feedback with no feedback provided.""" - result = await handle_customer_feedback("") - assert "Customer feedback handled: " in result +async def test_create_testimonial_video(): + result = await create_testimonial_video("ClientB") + assert "Testimonial video created for client 'ClientB'." in result @pytest.mark.asyncio -async def test_develop_brand_strategy(): - """Test developing a brand strategy.""" - result = await develop_brand_strategy("My Brand") - assert "Brand strategy developed for 'My Brand'." in result +async def test_manage_event_sponsorship(): + result = await manage_event_sponsorship("Expo2025", "SponsorX") + assert "Sponsorship for event 'Expo2025' managed with sponsor 'SponsorX'." in result @pytest.mark.asyncio -async def test_create_infographic(): - """Test creating an infographic.""" - result = await create_infographic("Top 10 Marketing Tips") - assert "Infographic 'Top 10 Marketing Tips' created." in result +async def test_optimize_conversion_funnel(): + result = await optimize_conversion_funnel("Checkout") + assert "Conversion funnel stage 'Checkout' optimized." in result @pytest.mark.asyncio async def test_run_influencer_marketing_campaign(): - """Test running an influencer marketing campaign.""" - result = await run_influencer_marketing_campaign( - "Launch Campaign", ["Influencer A", "Influencer B"] - ) - assert "Influencer marketing campaign 'Launch Campaign' run with influencers: Influencer A, Influencer B." in result + result = await run_influencer_marketing_campaign("InfluenceNow", ["Influencer1", "Influencer2"]) + assert "Influencer marketing campaign 'InfluenceNow' run with influencers: Influencer1, Influencer2." in result @pytest.mark.asyncio -async def test_manage_loyalty_program(): - """Test managing a loyalty program.""" - result = await manage_loyalty_program("Rewards Club", 5000) - assert "Loyalty program 'Rewards Club' managed with 5000 members." in result +async def test_analyze_website_traffic(): + result = await analyze_website_traffic("Google") + assert "Website traffic analyzed from source 'Google'." in result @pytest.mark.asyncio -async def test_create_marketing_campaign_empty_fields(): - """Test creating a marketing campaign with empty fields.""" - result = await create_marketing_campaign("", "", 0) - assert "Marketing campaign '' created targeting '' with a budget of $0.00." in result - - -@pytest.mark.asyncio -async def test_plan_product_launch_empty_fields(): - """Test planning a product launch with missing fields.""" - result = await plan_product_launch("", "") - assert "Product launch for '' planned on ." in result - - -@pytest.mark.asyncio -async def test_get_marketing_tools(): - """Test retrieving the list of marketing tools.""" - tools = get_marketing_tools() - assert len(tools) > 0 - assert all(isinstance(tool, FunctionTool) for tool in tools) - - -@pytest.mark.asyncio -async def test_get_marketing_tools_complete(): - """Test that all tools are included in the marketing tools list.""" - tools = get_marketing_tools() - assert len(tools) > 40 # Assuming there are more than 40 tools - assert any(tool.name == "create_marketing_campaign" for tool in tools) - assert all(isinstance(tool, FunctionTool) for tool in tools) - - -@pytest.mark.asyncio -async def test_schedule_marketing_event_invalid_location(): - """Test scheduling a marketing event with invalid location.""" - result = await schedule_marketing_event("Event Name", "2025-12-01", None) - assert "Marketing event 'Event Name' scheduled on 2025-12-01 at None." in result - - -@pytest.mark.asyncio -async def test_plan_product_launch_no_date(): - """Test planning a product launch with no launch date.""" - result = await plan_product_launch("Product X", None) - assert "Product launch for 'Product X' planned on None." in result - - -@pytest.mark.asyncio -async def test_handle_customer_feedback_none(): - """Test handling customer feedback with None.""" - result = await handle_customer_feedback(None) - assert "Customer feedback handled: None" in result - - -@pytest.mark.asyncio -async def test_generate_press_release_no_key_info(): - """Test generating a press release with no key information.""" - result = await generate_press_release("") - assert "generate a press release based on this content " in result - - -@pytest.mark.asyncio -async def test_schedule_marketing_event_invalid_inputs(): - """Test scheduling marketing event with invalid inputs.""" - result = await schedule_marketing_event("", None, None) - assert "Marketing event '' scheduled on None at None." in result - - -@pytest.mark.asyncio -async def test_plan_product_launch_invalid_date(): - """Test planning a product launch with invalid date.""" - result = await plan_product_launch("New Product", "not-a-date") - assert "Product launch for 'New Product' planned on not-a-date." in result - - -@pytest.mark.asyncio -async def test_handle_customer_feedback_empty_input(): - """Test handling customer feedback with empty input.""" - result = await handle_customer_feedback("") - assert "Customer feedback handled: " in result - - -@pytest.mark.asyncio -async def test_manage_email_marketing_invalid_recipients(): - """Test managing email marketing with invalid recipients.""" - result = await manage_email_marketing("Campaign X", -5) - assert "Email marketing managed for campaign 'Campaign X' targeting -5 recipients." in result - - -@pytest.mark.asyncio -async def test_track_campaign_performance_none(): - """Test tracking campaign performance with None.""" - result = await track_campaign_performance(None) - assert "Performance of campaign 'None' tracked." in result +async def test_develop_customer_personas(): + result = await develop_customer_personas("Millennials") + assert "Customer personas developed for segment 'Millennials'." in result +# ------------------ Tests for the MarketingAgent class ------------------ @pytest.fixture -def mock_agent_dependencies(): - """Provide mocked dependencies for the MarketingAgent.""" +def marketing_agent_dependencies(): + from autogen_core.components.models import AzureOpenAIChatCompletionClient return { - "mock_model_client": MagicMock(), - "mock_session_id": "session123", - "mock_user_id": "user123", - "mock_context": MagicMock(), - "mock_tools": [MagicMock()], - "mock_agent_id": "agent123", + "model_client": MagicMock(spec=AzureOpenAIChatCompletionClient), + "session_id": "sess_marketing", + "user_id": "user_marketing", + "model_context": MagicMock(), # This would be an instance of CosmosBufferedChatCompletionContext in production + "marketing_tools": get_marketing_tools(), + "marketing_tool_agent_id": ("marketing_agent", "sess_marketing"), } + + +def test_get_marketing_tools_complete(): + tools = get_marketing_tools() + # Check that there are many tools (for example, more than 40) + assert len(tools) > 40 + # Check that specific tool names are included. + tool_names = [tool.name for tool in tools] + for name in [ + "create_marketing_campaign", + "analyze_market_trends", + "generate_social_media_posts", + "plan_advertising_budget", + "conduct_customer_survey", + ]: + assert name in tool_names diff --git a/src/backend/tests/agents/test_planner.py b/src/backend/tests/agents/test_planner.py index 957823ce5..f76b484f0 100644 --- a/src/backend/tests/agents/test_planner.py +++ b/src/backend/tests/agents/test_planner.py @@ -1,9 +1,11 @@ +# src/backend/tests/agents/test_planner.py import os import sys -from unittest.mock import AsyncMock, MagicMock, patch +import json import pytest +from unittest.mock import AsyncMock, MagicMock -# Set environment variables before importing anything +# --- Setup environment and module search path --- os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -11,175 +13,226 @@ os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" +sys.modules["azure.monitor.events.extension"] = MagicMock() # Patch missing azure module -# Mock `azure.monitor.events.extension` globally -sys.modules["azure.monitor.events.extension"] = MagicMock() -sys.modules["event_utils"] = MagicMock() -# Import modules after setting environment variables -from src.backend.agents.planner import PlannerAgent -from src.backend.models.messages import InputTask, HumanClarification, Plan, PlanStatus -from src.backend.context.cosmos_memory import CosmosBufferedChatCompletionContext - - -@pytest.fixture -def mock_context(): - """Mock the CosmosBufferedChatCompletionContext.""" - return MagicMock(spec=CosmosBufferedChatCompletionContext) - - -@pytest.fixture -def mock_model_client(): - """Mock the Azure OpenAI model client.""" - return MagicMock() - - -@pytest.fixture -def mock_runtime_context(): - """Mock the runtime context for AgentInstantiationContext.""" - with patch( - "autogen_core.base._agent_instantiation.AgentInstantiationContext.AGENT_INSTANTIATION_CONTEXT_VAR", - new=MagicMock(), - ) as mock_context_var: - yield mock_context_var +# Ensure the project root is in sys.path. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) +from autogen_core.base._agent_instantiation import AgentInstantiationContext -@pytest.fixture -def planner_agent(mock_model_client, mock_context, mock_runtime_context): - """Return an instance of PlannerAgent with mocked dependencies.""" - mock_runtime_context.get.return_value = (MagicMock(), "mock-agent-id") - return PlannerAgent( - model_client=mock_model_client, - session_id="test-session", - user_id="test-user", - memory=mock_context, - available_agents=["HumanAgent", "MarketingAgent", "TechSupportAgent"], - agent_tools_list=["tool1", "tool2"], - ) +@pytest.fixture(autouse=True) +def patch_instantiation_context(monkeypatch): + monkeypatch.setattr(AgentInstantiationContext, "current_runtime", lambda: "dummy_runtime") + monkeypatch.setattr(AgentInstantiationContext, "current_agent_id", lambda: "dummy_agent_id") -@pytest.mark.asyncio -async def test_handle_plan_clarification(planner_agent, mock_context): - """Test the handle_plan_clarification method.""" - mock_clarification = HumanClarification( - session_id="test-session", - plan_id="plan-1", - human_clarification="Test clarification", - ) - mock_context.get_plan_by_session = AsyncMock( - return_value=Plan( - id="plan-1", - session_id="test-session", - user_id="test-user", - initial_goal="Test Goal", - overall_status="in_progress", +# --- Imports from the module under test --- +from autogen_core.components.models import UserMessage +from autogen_core.base import MessageContext +from src.backend.agents.planner import PlannerAgent +from src.backend.models.messages import ( + BAgentType, + InputTask, + Plan, + PlanStatus, + Step, + StepStatus, + HumanFeedbackStatus, +) + + +class DummyMessageContext(MessageContext): + def __init__(self, sender="dummy_sender", topic_id="dummy_topic", is_rpc=False, cancellation_token=None): + self.sender = sender + self.topic_id = topic_id + self.is_rpc = is_rpc + self.cancellation_token = cancellation_token + + +class FakeMemory: + def __init__(self): + self.added_plans = [] + self.added_steps = [] + self.added_items = [] + self.updated_plan = None + self.updated_steps = [] + + async def add_plan(self, plan): + self.added_plans.append(plan) + + async def add_step(self, step): + self.added_steps.append(step) + + async def add_item(self, item): + self.added_items.append(item) + + async def update_plan(self, plan): + self.updated_plan = plan + + async def update_step(self, step): + self.updated_steps.append(step) + + async def get_plan_by_session(self, session_id: str) -> Plan: + return Plan( + id="plan_test", + session_id=session_id, + user_id="user_test", + initial_goal="Test initial goal", + overall_status=PlanStatus.in_progress, source="PlannerAgent", - summary="Mock Summary", - human_clarification_request=None, + summary="Test summary", + human_clarification_request="Test clarification", ) - ) - mock_context.update_plan = AsyncMock() - mock_context.add_item = AsyncMock() - await planner_agent.handle_plan_clarification(mock_clarification, None) - - mock_context.get_plan_by_session.assert_called_with(session_id="test-session") - mock_context.update_plan.assert_called() - mock_context.add_item.assert_called() + async def get_steps_by_plan(self, plan_id: str) -> list: + step = Step( + id="step_test", + plan_id=plan_id, + action="Test step action", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="session_test", + user_id="user_test", + human_approval_status=HumanFeedbackStatus.requested, + ) + return [step] +# --- Dummy model client simulating LLM responses --- -@pytest.mark.asyncio -async def test_generate_instruction_with_special_characters(planner_agent): - """Test _generate_instruction with special characters in the objective.""" - special_objective = "Solve this task: @$%^&*()" - instruction = planner_agent._generate_instruction(special_objective) - assert "Solve this task: @$%^&*()" in instruction - assert "HumanAgent" in instruction - assert "tool1" in instruction +class DummyModelClient: + async def create(self, messages, extra_create_args=None): + # Simulate a valid structured response based on the expected schema. + response_dict = { + "initial_goal": "Achieve test goal", + "steps": [{"action": "Do step 1", "agent": BAgentType.human_agent.value}], + "summary_plan_and_steps": "Test plan summary", + "human_clarification_request": "Need details" + } + dummy_resp = MagicMock() + dummy_resp.content = json.dumps(response_dict) + return dummy_resp +# --- Fixture for PlannerAgent --- -@pytest.mark.asyncio -async def test_handle_plan_clarification_updates_plan_correctly(planner_agent, mock_context): - """Test handle_plan_clarification ensures correct plan updates.""" - mock_clarification = HumanClarification( - session_id="test-session", - plan_id="plan-1", - human_clarification="Updated clarification text", - ) - mock_plan = Plan( - id="plan-1", - session_id="test-session", - user_id="test-user", - initial_goal="Test Goal", - overall_status="in_progress", - source="PlannerAgent", - summary="Mock Summary", - human_clarification_request="Previous clarification needed", +@pytest.fixture +def planner_agent(): + dummy_model_client = DummyModelClient() + session_id = "session_test" + user_id = "user_test" + fake_memory = FakeMemory() + available_agents = [BAgentType.human_agent, BAgentType.tech_support_agent] + agent_tools_list = ["tool1", "tool2"] + agent = PlannerAgent( + model_client=dummy_model_client, + session_id=session_id, + user_id=user_id, + memory=fake_memory, + available_agents=available_agents, + agent_tools_list=agent_tools_list, ) + return agent, fake_memory - mock_context.get_plan_by_session = AsyncMock(return_value=mock_plan) - mock_context.update_plan = AsyncMock() - - await planner_agent.handle_plan_clarification(mock_clarification, None) - - assert mock_plan.human_clarification_response == "Updated clarification text" - mock_context.update_plan.assert_called_with(mock_plan) - - -@pytest.mark.asyncio -async def test_handle_input_task_with_exception(planner_agent, mock_context): - """Test handle_input_task gracefully handles exceptions.""" - input_task = InputTask(description="Test task causing exception", session_id="test-session") - planner_agent._create_structured_plan = AsyncMock(side_effect=Exception("Mocked exception")) - - with pytest.raises(Exception, match="Mocked exception"): - await planner_agent.handle_input_task(input_task, None) - - planner_agent._create_structured_plan.assert_called() - mock_context.add_item.assert_not_called() - mock_context.add_plan.assert_not_called() - mock_context.add_step.assert_not_called() +# ------------------- Tests for handle_input_task ------------------- @pytest.mark.asyncio -async def test_handle_plan_clarification_handles_memory_error(planner_agent, mock_context): - """Test handle_plan_clarification gracefully handles memory errors.""" - mock_clarification = HumanClarification( - session_id="test-session", - plan_id="plan-1", - human_clarification="Test clarification", +async def test_handle_input_task_success(planner_agent): + """Test that handle_input_task returns a valid plan and calls memory.add_item.""" + agent, fake_memory = planner_agent + input_task = InputTask(description="Test objective", session_id="session_test") + ctx = DummyMessageContext() + # Patch _create_structured_plan to simulate a valid LLM response. + dummy_plan = Plan( + id="plan_success", + session_id="session_test", + user_id="user_test", + initial_goal="Achieve test goal", + overall_status=PlanStatus.in_progress, + source="PlannerAgent", + summary="Dummy summary", + human_clarification_request="Request info" ) - - mock_context.get_plan_by_session = AsyncMock(side_effect=Exception("Memory error")) - - with pytest.raises(Exception, match="Memory error"): - await planner_agent.handle_plan_clarification(mock_clarification, None) - - mock_context.update_plan.assert_not_called() - mock_context.add_item.assert_not_called() + dummy_steps = [ + Step( + id="step1", + plan_id="plan_success", + action="Do step 1", + agent=BAgentType.human_agent, + status=StepStatus.planned, + session_id="session_test", + user_id="user_test", + human_approval_status=HumanFeedbackStatus.requested, + ) + ] + agent._create_structured_plan = AsyncMock(return_value=(dummy_plan, dummy_steps)) + fake_memory.add_item = AsyncMock() + result = await agent.handle_input_task(input_task, ctx) + assert result.id == "plan_success" + fake_memory.add_item.assert_called() @pytest.mark.asyncio -async def test_generate_instruction_with_missing_objective(planner_agent): - """Test _generate_instruction with a missing or empty objective.""" - instruction = planner_agent._generate_instruction("") - assert "Your objective is:" in instruction - assert "The agents you have access to are:" in instruction - assert "These agents have access to the following functions:" in instruction +async def test_handle_input_task_no_steps(planner_agent): + """Test that _create_structured_plan raising ValueError causes exception.""" + agent, fake_memory = planner_agent + input_task = InputTask(description="Test objective", session_id="session_test") + ctx = DummyMessageContext() + # Patch _create_structured_plan to return no steps. + agent._create_structured_plan = AsyncMock(side_effect=ValueError("No steps found")) + with pytest.raises(ValueError, match="No steps found"): + await agent.handle_input_task(input_task, ctx) + +# ------------------- Tests for _generate_instruction ------------------- + + +def test_generate_instruction_contains_content(planner_agent): + agent, _ = planner_agent + instruction = agent._generate_instruction("Test objective") + assert "Test objective" in instruction + # Check that available agents and tool list are included. + for ag in agent._available_agents: + # BAgentType enum values are strings via .value + assert ag.value in instruction + if agent._agent_tools_list: + for tool in agent._agent_tools_list: + assert tool in instruction + +# ------------------- Tests for _create_structured_plan ------------------- @pytest.mark.asyncio -async def test_create_structured_plan_with_error(planner_agent, mock_context): - """Test _create_structured_plan when an error occurs during plan creation.""" - planner_agent._model_client.create = AsyncMock(side_effect=Exception("Mocked error")) +async def test_create_structured_plan_success(planner_agent): + """Test _create_structured_plan returns a valid plan and steps.""" + agent, fake_memory = planner_agent + structured_response = { + "initial_goal": "Goal A", + "steps": [{"action": "Step 1 action", "agent": BAgentType.human_agent.value}], + "summary_plan_and_steps": "Plan summary A", + "human_clarification_request": "Clarify details" + } + dummy_response = MagicMock() + dummy_response.content = json.dumps(structured_response) + agent._model_client.create = AsyncMock(return_value=dummy_response) + fake_memory.add_plan = AsyncMock() + fake_memory.add_step = AsyncMock() + messages = [UserMessage(content="Dummy instruction", source="PlannerAgent")] + plan, steps = await agent._create_structured_plan(messages) + assert plan.initial_goal == "Goal A" + assert len(steps) == 1 + fake_memory.add_plan.assert_called_once() + fake_memory.add_step.assert_called_once() - messages = [{"content": "Test message", "source": "PlannerAgent"}] - plan, steps = await planner_agent._create_structured_plan(messages) - assert plan.initial_goal == "Error generating plan" +@pytest.mark.asyncio +async def test_create_structured_plan_exception(planner_agent): + """Test _create_structured_plan exception handling when model client fails.""" + agent, fake_memory = planner_agent + agent._model_client.create = AsyncMock(side_effect=Exception("LLM error")) + messages = [UserMessage(content="Dummy instruction", source="PlannerAgent")] + plan, steps = await agent._create_structured_plan(messages) assert plan.overall_status == PlanStatus.failed - assert len(steps) == 0 - mock_context.add_plan.assert_not_called() - mock_context.add_step.assert_not_called() + assert plan.id == "" + assert steps == [] diff --git a/src/backend/tests/agents/test_procurement.py b/src/backend/tests/agents/test_procurement.py index 4c214db0b..db6499574 100644 --- a/src/backend/tests/agents/test_procurement.py +++ b/src/backend/tests/agents/test_procurement.py @@ -3,10 +3,14 @@ import pytest from unittest.mock import MagicMock -# Mocking azure.monitor.events.extension globally +# --- Fake missing Azure modules --- sys.modules["azure.monitor.events.extension"] = MagicMock() -# Setting up environment variables to mock Config dependencies +# Adjust sys.path so that the project root is found. +# Assuming this test file is at: src/backend/tests/agents/test_procurement.py +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) + +# Set required environment variables (needed by Config and other modules) os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -15,7 +19,7 @@ os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" -# Import the procurement tools for testing +# Import procurement functions and classes from procurement.py from src.backend.agents.procurement import ( order_hardware, order_software_license, @@ -34,645 +38,113 @@ recommend_sourcing_options, update_asset_register, conduct_market_research, + get_procurement_information, + schedule_maintenance, audit_inventory, approve_budget, + manage_warranty, + handle_customs_clearance, + negotiate_discount, + register_new_vendor, + decommission_asset, + schedule_training, + update_vendor_rating, + handle_recall, + request_samples, + manage_subscription, + verify_supplier_certification, + conduct_supplier_audit, manage_import_licenses, + conduct_cost_analysis, + evaluate_risk_factors, + manage_green_procurement_policy, + update_supplier_database, + handle_dispute_resolution, + assess_compliance, + manage_reverse_logistics, + verify_delivery, + handle_procurement_risk_assessment, + manage_supplier_contract, allocate_budget, track_procurement_metrics, + manage_inventory_levels, + conduct_supplier_survey, + get_procurement_tools, ) -# Mocking `track_event_if_configured` for tests -sys.modules["src.backend.event_utils"] = MagicMock() - - -@pytest.mark.asyncio -async def test_order_hardware(): - result = await order_hardware("laptop", 10) - assert "Ordered 10 units of laptop." in result - - -@pytest.mark.asyncio -async def test_order_software_license(): - result = await order_software_license("Photoshop", "team", 5) - assert "Ordered 5 team licenses of Photoshop." in result - - -@pytest.mark.asyncio -async def test_check_inventory(): - result = await check_inventory("printer") - assert "Inventory status of printer: In Stock." in result - - -@pytest.mark.asyncio -async def test_process_purchase_order(): - result = await process_purchase_order("PO12345") - assert "Purchase Order PO12345 has been processed." in result - - -@pytest.mark.asyncio -async def test_initiate_contract_negotiation(): - result = await initiate_contract_negotiation("VendorX", "Exclusive deal for 2025") - assert ( - "Contract negotiation initiated with VendorX: Exclusive deal for 2025" in result - ) - - -@pytest.mark.asyncio -async def test_approve_invoice(): - result = await approve_invoice("INV001") - assert "Invoice INV001 approved for payment." in result - - -@pytest.mark.asyncio -async def test_track_order(): - result = await track_order("ORDER123") - assert "Order ORDER123 is currently in transit." in result - - -@pytest.mark.asyncio -async def test_manage_vendor_relationship(): - result = await manage_vendor_relationship("VendorY", "renewed") - assert "Vendor relationship with VendorY has been renewed." in result - - -@pytest.mark.asyncio -async def test_update_procurement_policy(): - result = await update_procurement_policy( - "Policy2025", "Updated terms and conditions" - ) - assert "Procurement policy 'Policy2025' updated." in result - - -@pytest.mark.asyncio -async def test_generate_procurement_report(): - result = await generate_procurement_report("Annual") - assert "Generated Annual procurement report." in result - - -@pytest.mark.asyncio -async def test_evaluate_supplier_performance(): - result = await evaluate_supplier_performance("SupplierZ") - assert "Performance evaluation for supplier SupplierZ completed." in result - - -@pytest.mark.asyncio -async def test_handle_return(): - result = await handle_return("Laptop", 3, "Defective screens") - assert "Processed return of 3 units of Laptop due to Defective screens." in result - - -@pytest.mark.asyncio -async def test_process_payment(): - result = await process_payment("VendorA", 5000.00) - assert "Processed payment of $5000.00 to VendorA." in result - - -@pytest.mark.asyncio -async def test_request_quote(): - result = await request_quote("Tablet", 20) - assert "Requested quote for 20 units of Tablet." in result - - -@pytest.mark.asyncio -async def test_recommend_sourcing_options(): - result = await recommend_sourcing_options("Projector") - assert "Sourcing options for Projector have been provided." in result - - -@pytest.mark.asyncio -async def test_update_asset_register(): - result = await update_asset_register("ServerX", "Deployed in Data Center") - assert "Asset register updated for ServerX: Deployed in Data Center" in result - - -@pytest.mark.asyncio -async def test_conduct_market_research(): - result = await conduct_market_research("Electronics") - assert "Market research conducted for category: Electronics" in result - - -@pytest.mark.asyncio -async def test_audit_inventory(): - result = await audit_inventory() - assert "Inventory audit has been conducted." in result - - -@pytest.mark.asyncio -async def test_approve_budget(): - result = await approve_budget("BUD001", 25000.00) - assert "Approved budget ID BUD001 for amount $25000.00." in result - - -@pytest.mark.asyncio -async def test_manage_import_licenses(): - result = await manage_import_licenses("Smartphones", "License12345") - assert "Import license for Smartphones managed: License12345." in result - - -@pytest.mark.asyncio -async def test_allocate_budget(): - result = await allocate_budget("IT Department", 150000.00) - assert "Allocated budget of $150000.00 to IT Department." in result - - -@pytest.mark.asyncio -async def test_track_procurement_metrics(): - result = await track_procurement_metrics("Cost Savings") - assert "Procurement metric 'Cost Savings' tracked." in result - - -@pytest.mark.asyncio -async def test_order_hardware_invalid_quantity(): - result = await order_hardware("printer", 0) - assert "Ordered 0 units of printer." in result - - -@pytest.mark.asyncio -async def test_order_software_license_invalid_type(): - result = await order_software_license("Photoshop", "", 5) - assert "Ordered 5 licenses of Photoshop." in result - - -@pytest.mark.asyncio -async def test_check_inventory_empty_item(): - result = await check_inventory("") - assert "Inventory status of : In Stock." in result - - -@pytest.mark.asyncio -async def test_process_purchase_order_empty(): - result = await process_purchase_order("") - assert "Purchase Order has been processed." in result - - -@pytest.mark.asyncio -async def test_initiate_contract_negotiation_empty_details(): - result = await initiate_contract_negotiation("", "") - assert "Contract negotiation initiated with : " in result - - -@pytest.mark.asyncio -async def test_approve_invoice_empty(): - result = await approve_invoice("") - assert "Invoice approved for payment." in result - - -@pytest.mark.asyncio -async def test_track_order_empty_order(): - result = await track_order("") - assert "Order is currently in transit." in result - - -@pytest.mark.asyncio -async def test_manage_vendor_relationship_empty_action(): - result = await manage_vendor_relationship("VendorA", "") - assert "Vendor relationship with VendorA has been ." in result - - -@pytest.mark.asyncio -async def test_update_procurement_policy_no_content(): - result = await update_procurement_policy("Policy2025", "") - assert "Procurement policy 'Policy2025' updated." in result - - -@pytest.mark.asyncio -async def test_generate_procurement_report_empty_type(): - result = await generate_procurement_report("") - assert "Generated procurement report." in result - - -@pytest.mark.asyncio -async def test_evaluate_supplier_performance_empty_name(): - result = await evaluate_supplier_performance("") - assert "Performance evaluation for supplier completed." in result - - -@pytest.mark.asyncio -async def test_handle_return_negative_quantity(): - result = await handle_return("Monitor", -5, "Damaged") - assert "Processed return of -5 units of Monitor due to Damaged." in result - - -@pytest.mark.asyncio -async def test_process_payment_zero_amount(): - result = await process_payment("VendorB", 0.00) - assert "Processed payment of $0.00 to VendorB." in result - - -@pytest.mark.asyncio -async def test_request_quote_empty_item(): - result = await request_quote("", 10) - assert "Requested quote for 10 units of ." in result - - -@pytest.mark.asyncio -async def test_recommend_sourcing_options_empty_item(): - result = await recommend_sourcing_options("") - assert "Sourcing options for have been provided." in result - - -@pytest.mark.asyncio -async def test_update_asset_register_empty_details(): - result = await update_asset_register("AssetX", "") - assert "Asset register updated for AssetX: " in result - - -@pytest.mark.asyncio -async def test_conduct_market_research_empty_category(): - result = await conduct_market_research("") - assert "Market research conducted for category: " in result - - -@pytest.mark.asyncio -async def test_audit_inventory_double_call(): - result1 = await audit_inventory() - result2 = await audit_inventory() - assert result1 == "Inventory audit has been conducted." - assert result2 == "Inventory audit has been conducted." - - -@pytest.mark.asyncio -async def test_approve_budget_negative_amount(): - result = await approve_budget("BUD002", -1000.00) - assert "Approved budget ID BUD002 for amount $-1000.00." in result - - -@pytest.mark.asyncio -async def test_manage_import_licenses_empty_license(): - result = await manage_import_licenses("Electronics", "") - assert "Import license for Electronics managed: ." in result - - -@pytest.mark.asyncio -async def test_allocate_budget_negative_value(): - result = await allocate_budget("HR Department", -50000.00) - assert "Allocated budget of $-50000.00 to HR Department." in result - - -@pytest.mark.asyncio -async def test_track_procurement_metrics_empty_metric(): - result = await track_procurement_metrics("") - assert "Procurement metric '' tracked." in result - - -@pytest.mark.asyncio -async def test_handle_return_zero_quantity(): - result = await handle_return("Monitor", 0, "Packaging error") - assert "Processed return of 0 units of Monitor due to Packaging error." in result - - -@pytest.mark.asyncio -async def test_order_hardware_large_quantity(): - result = await order_hardware("Monitor", 1000000) - assert "Ordered 1000000 units of Monitor." in result - -@pytest.mark.asyncio -async def test_process_payment_large_amount(): - result = await process_payment("VendorX", 10000000.99) - assert "Processed payment of $10000000.99 to VendorX." in result - - -@pytest.mark.asyncio -async def test_track_order_invalid_number(): - result = await track_order("INVALID123") - assert "Order INVALID123 is currently in transit." in result - - -@pytest.mark.asyncio -async def test_initiate_contract_negotiation_long_details(): - long_details = ( - "This is a very long contract negotiation detail for testing purposes. " * 10 - ) - result = await initiate_contract_negotiation("VendorY", long_details) - assert "Contract negotiation initiated with VendorY" in result - assert long_details in result - - -@pytest.mark.asyncio -async def test_manage_vendor_relationship_invalid_action(): - result = await manage_vendor_relationship("VendorZ", "undefined") - assert "Vendor relationship with VendorZ has been undefined." in result - - -@pytest.mark.asyncio -async def test_update_procurement_policy_no_policy_name(): - result = await update_procurement_policy("", "Updated policy details") - assert "Procurement policy '' updated." in result - - -@pytest.mark.asyncio -async def test_generate_procurement_report_invalid_type(): - result = await generate_procurement_report("Nonexistent") - assert "Generated Nonexistent procurement report." in result - - -@pytest.mark.asyncio -async def test_evaluate_supplier_performance_no_supplier_name(): - result = await evaluate_supplier_performance("") - assert "Performance evaluation for supplier completed." in result - - -@pytest.mark.asyncio -async def test_manage_import_licenses_no_item_name(): - result = await manage_import_licenses("", "License123") - assert "Import license for managed: License123." in result - - -@pytest.mark.asyncio -async def test_allocate_budget_zero_value(): - result = await allocate_budget("Operations", 0) - assert "Allocated budget of $0.00 to Operations." in result - - -@pytest.mark.asyncio -async def test_audit_inventory_multiple_calls(): - result1 = await audit_inventory() - result2 = await audit_inventory() - assert result1 == "Inventory audit has been conducted." - assert result2 == "Inventory audit has been conducted." - - -@pytest.mark.asyncio -async def test_approve_budget_large_amount(): - result = await approve_budget("BUD123", 1e9) - assert "Approved budget ID BUD123 for amount $1000000000.00." in result - - -@pytest.mark.asyncio -async def test_request_quote_no_quantity(): - result = await request_quote("Laptop", 0) - assert "Requested quote for 0 units of Laptop." in result - - -@pytest.mark.asyncio -async def test_conduct_market_research_no_category(): - result = await conduct_market_research("") - assert "Market research conducted for category: " in result - - -@pytest.mark.asyncio -async def test_track_procurement_metrics_no_metric_name(): - result = await track_procurement_metrics("") - assert "Procurement metric '' tracked." in result - - -@pytest.mark.asyncio -async def test_order_hardware_no_item_name(): - """Test line 98: Edge case where item name is empty.""" - result = await order_hardware("", 5) - assert "Ordered 5 units of ." in result - - -@pytest.mark.asyncio -async def test_order_hardware_negative_quantity(): - """Test line 108: Handle negative quantities.""" - result = await order_hardware("Keyboard", -5) - assert "Ordered -5 units of Keyboard." in result - - -@pytest.mark.asyncio -async def test_order_software_license_no_license_type(): - """Test line 123: License type missing.""" - result = await order_software_license("Photoshop", "", 10) - assert "Ordered 10 licenses of Photoshop." in result - - -@pytest.mark.asyncio -async def test_order_software_license_no_quantity(): - """Test line 128: Quantity missing.""" - result = await order_software_license("Photoshop", "team", 0) - assert "Ordered 0 team licenses of Photoshop." in result - - -@pytest.mark.asyncio -async def test_process_purchase_order_invalid_number(): - """Test line 133: Invalid purchase order number.""" - result = await process_purchase_order("") - assert "Purchase Order has been processed." in result - - -@pytest.mark.asyncio -async def test_check_inventory_empty_item_name(): - """Test line 138: Inventory check for an empty item.""" - result = await check_inventory("") - assert "Inventory status of : In Stock." in result - - -@pytest.mark.asyncio -async def test_initiate_contract_negotiation_empty_vendor(): - """Test line 143: Contract negotiation with empty vendor name.""" - result = await initiate_contract_negotiation("", "Sample contract") - assert "Contract negotiation initiated with : Sample contract" in result - - -@pytest.mark.asyncio -async def test_update_procurement_policy_empty_policy_name(): - """Test line 158: Empty policy name.""" - result = await update_procurement_policy("", "New terms") - assert "Procurement policy '' updated." in result - - -@pytest.mark.asyncio -async def test_evaluate_supplier_performance_no_name(): - """Test line 168: Empty supplier name.""" - result = await evaluate_supplier_performance("") - assert "Performance evaluation for supplier completed." in result - - -@pytest.mark.asyncio -async def test_handle_return_empty_reason(): - """Test line 173: Handle return with no reason provided.""" - result = await handle_return("Laptop", 2, "") - assert "Processed return of 2 units of Laptop due to ." in result - - -@pytest.mark.asyncio -async def test_process_payment_no_vendor_name(): - """Test line 178: Payment processing with no vendor name.""" - result = await process_payment("", 500.00) - assert "Processed payment of $500.00 to ." in result - - -@pytest.mark.asyncio -async def test_manage_import_licenses_no_details(): - """Test line 220: Import licenses with empty details.""" - result = await manage_import_licenses("Smartphones", "") - assert "Import license for Smartphones managed: ." in result - - -@pytest.mark.asyncio -async def test_allocate_budget_no_department_name(): - """Test line 255: Allocate budget with empty department name.""" - result = await allocate_budget("", 1000.00) - assert "Allocated budget of $1000.00 to ." in result - - -@pytest.mark.asyncio -async def test_track_procurement_metrics_no_metric(): - """Test line 540: Track metrics with empty metric name.""" - result = await track_procurement_metrics("") - assert "Procurement metric '' tracked." in result - - -@pytest.mark.asyncio -async def test_handle_return_negative_and_zero_quantity(): - """Covers lines 173, 178.""" - result_negative = await handle_return("Laptop", -5, "Damaged") - result_zero = await handle_return("Laptop", 0, "Packaging Issue") - assert "Processed return of -5 units of Laptop due to Damaged." in result_negative - assert ( - "Processed return of 0 units of Laptop due to Packaging Issue." in result_zero - ) - - -@pytest.mark.asyncio -async def test_process_payment_no_vendor_name_large_amount(): - """Covers line 188.""" - result_empty_vendor = await process_payment("", 1000000.00) - assert "Processed payment of $1000000.00 to ." in result_empty_vendor - - -@pytest.mark.asyncio -async def test_request_quote_edge_cases(): - """Covers lines 193, 198.""" - result_no_quantity = await request_quote("Tablet", 0) - result_negative_quantity = await request_quote("Tablet", -10) - assert "Requested quote for 0 units of Tablet." in result_no_quantity - assert "Requested quote for -10 units of Tablet." in result_negative_quantity - - -@pytest.mark.asyncio -async def test_update_asset_register_no_details(): - """Covers line 203.""" - result = await update_asset_register("ServerX", "") - assert "Asset register updated for ServerX: " in result - - -@pytest.mark.asyncio -async def test_audit_inventory_multiple_runs(): - """Covers lines 213.""" - result1 = await audit_inventory() - result2 = await audit_inventory() - assert result1 == "Inventory audit has been conducted." - assert result2 == "Inventory audit has been conducted." - - -@pytest.mark.asyncio -async def test_approve_budget_negative_and_zero_amount(): - """Covers lines 220, 225.""" - result_zero = await approve_budget("BUD123", 0.00) - result_negative = await approve_budget("BUD124", -500.00) - assert "Approved budget ID BUD123 for amount $0.00." in result_zero - assert "Approved budget ID BUD124 for amount $-500.00." in result_negative - - -@pytest.mark.asyncio -async def test_manage_import_licenses_no_license_details(): - """Covers lines 230, 235.""" - result_empty_license = await manage_import_licenses("Smartphones", "") - result_no_item = await manage_import_licenses("", "License12345") - assert "Import license for Smartphones managed: ." in result_empty_license - assert "Import license for managed: License12345." in result_no_item - - -@pytest.mark.asyncio -async def test_allocate_budget_no_department_and_large_values(): - """Covers lines 250, 255.""" - result_no_department = await allocate_budget("", 10000.00) - result_large_amount = await allocate_budget("Operations", 1e9) - assert "Allocated budget of $10000.00 to ." in result_no_department - assert "Allocated budget of $1000000000.00 to Operations." in result_large_amount - - -@pytest.mark.asyncio -async def test_track_procurement_metrics_empty_name(): - """Covers line 540.""" - result = await track_procurement_metrics("") - assert "Procurement metric '' tracked." in result - - -@pytest.mark.asyncio -async def test_order_hardware_missing_name_and_zero_quantity(): - """Covers lines 98 and 108.""" - result_missing_name = await order_hardware("", 10) - result_zero_quantity = await order_hardware("Keyboard", 0) - assert "Ordered 10 units of ." in result_missing_name - assert "Ordered 0 units of Keyboard." in result_zero_quantity - - -@pytest.mark.asyncio -async def test_process_purchase_order_empty_number(): - """Covers line 133.""" - result = await process_purchase_order("") - assert "Purchase Order has been processed." in result - - -@pytest.mark.asyncio -async def test_initiate_contract_negotiation_empty_vendor_and_details(): - """Covers lines 143, 148.""" - result_empty_vendor = await initiate_contract_negotiation("", "Details") - result_empty_details = await initiate_contract_negotiation("VendorX", "") - assert "Contract negotiation initiated with : Details" in result_empty_vendor - assert "Contract negotiation initiated with VendorX: " in result_empty_details - - -@pytest.mark.asyncio -async def test_manage_vendor_relationship_unexpected_action(): - """Covers line 153.""" - result = await manage_vendor_relationship("VendorZ", "undefined") - assert "Vendor relationship with VendorZ has been undefined." in result - - -@pytest.mark.asyncio -async def test_handle_return_zero_and_negative_quantity(): - """Covers lines 173, 178.""" - result_zero = await handle_return("Monitor", 0, "No issue") - result_negative = await handle_return("Monitor", -5, "Damaged") - assert "Processed return of 0 units of Monitor due to No issue." in result_zero - assert "Processed return of -5 units of Monitor due to Damaged." in result_negative - - -@pytest.mark.asyncio -async def test_process_payment_large_amount_and_no_vendor_name(): - """Covers line 188.""" - result_large_amount = await process_payment("VendorX", 1e7) - result_no_vendor = await process_payment("", 500.00) - assert "Processed payment of $10000000.00 to VendorX." in result_large_amount - assert "Processed payment of $500.00 to ." in result_no_vendor - - -@pytest.mark.asyncio -async def test_request_quote_zero_and_negative_quantity(): - """Covers lines 193, 198.""" - result_zero = await request_quote("Tablet", 0) - result_negative = await request_quote("Tablet", -10) - assert "Requested quote for 0 units of Tablet." in result_zero - assert "Requested quote for -10 units of Tablet." in result_negative - - -@pytest.mark.asyncio -async def test_track_procurement_metrics_with_invalid_input(): - """Covers edge cases for tracking metrics.""" - result_empty = await track_procurement_metrics("") - result_invalid = await track_procurement_metrics("InvalidMetricName") - assert "Procurement metric '' tracked." in result_empty - assert "Procurement metric 'InvalidMetricName' tracked." in result_invalid - - -@pytest.mark.asyncio -async def test_order_hardware_invalid_cases(): - """Covers invalid inputs for order_hardware.""" - result_no_name = await order_hardware("", 5) - result_negative_quantity = await order_hardware("Laptop", -10) - assert "Ordered 5 units of ." in result_no_name - assert "Ordered -10 units of Laptop." in result_negative_quantity - - -@pytest.mark.asyncio -async def test_order_software_license_invalid_cases(): - """Covers invalid inputs for order_software_license.""" - result_empty_type = await order_software_license("Photoshop", "", 5) - result_zero_quantity = await order_software_license("Photoshop", "Single User", 0) - assert "Ordered 5 licenses of Photoshop." in result_empty_type - assert "Ordered 0 Single User licenses of Photoshop." in result_zero_quantity +# --- Parameterized tests for Procurement functions --- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "func, args, expected", + [ + (order_hardware, ("Laptop", 3), "Ordered 3 units of Laptop."), + (order_software_license, ("OfficeSuite", "Enterprise", 5), "Ordered 5 Enterprise licenses of OfficeSuite."), + (check_inventory, ("Monitor",), "Inventory status of Monitor: In Stock."), + (process_purchase_order, ("PO123",), "Purchase Order PO123 has been processed."), + (initiate_contract_negotiation, ("VendorX", "Exclusive deal"), "Contract negotiation initiated with VendorX: Exclusive deal"), + (approve_invoice, ("INV001",), "Invoice INV001 approved for payment."), + (track_order, ("ORDER001",), "Order ORDER001 is currently in transit."), + (manage_vendor_relationship, ("VendorY", "improved"), "Vendor relationship with VendorY has been improved."), + (update_procurement_policy, ("Policy1", "New Terms"), "Procurement policy 'Policy1' updated."), + (generate_procurement_report, ("Summary",), "Generated Summary procurement report."), + (evaluate_supplier_performance, ("SupplierA",), "Performance evaluation for supplier SupplierA completed."), + (handle_return, ("Printer", 2, "Defective"), "Processed return of 2 units of Printer due to Defective."), + (process_payment, ("VendorZ", 999.99), "Processed payment of $999.99 to VendorZ."), + (request_quote, ("Server", 4), "Requested quote for 4 units of Server."), + (recommend_sourcing_options, ("Router",), "Sourcing options for Router have been provided."), + (update_asset_register, ("Asset1", "Details"), "Asset register updated for Asset1: Details"), + (conduct_market_research, ("Electronics",), "Market research conducted for category: Electronics"), + # For get_procurement_information, we now expect the returned text to contain a known substring. + (get_procurement_information, ("Any query",), "Contoso's Procurement Policies and Procedures"), + (schedule_maintenance, ("Printer", "2023-07-01"), "Scheduled maintenance for Printer on 2023-07-01."), + (audit_inventory, (), "Inventory audit has been conducted."), + (approve_budget, ("BUD001", 2000.0), "Approved budget ID BUD001 for amount $2000.00."), + (manage_warranty, ("Laptop", "1 year"), "Warranty for Laptop managed for period 1 year."), + (handle_customs_clearance, ("SHIP001",), "Customs clearance for shipment ID SHIP001 handled."), + (negotiate_discount, ("VendorQ", 10.0), "Negotiated a 10.0% discount with vendor VendorQ."), + (register_new_vendor, ("VendorNew", "Details"), "New vendor VendorNew registered with details: Details."), + (decommission_asset, ("Old Printer",), "Asset Old Printer has been decommissioned."), + (schedule_training, ("Procurement Basics", "2023-08-15"), "Training session 'Procurement Basics' scheduled on 2023-08-15."), + (update_vendor_rating, ("VendorR", 4.5), "Vendor VendorR rating updated to 4.5."), + (handle_recall, ("Monitor", "Faulty display"), "Recall of Monitor due to Faulty display handled."), + (request_samples, ("Keyboard", 3), "Requested 3 samples of Keyboard."), + (manage_subscription, ("CloudService", "activated"), "Subscription to CloudService has been activated."), + (verify_supplier_certification, ("SupplierZ",), "Certification status of supplier SupplierZ verified."), + (conduct_supplier_audit, ("SupplierZ",), "Audit of supplier SupplierZ conducted."), + (manage_import_licenses, ("ItemX", "License Info"), "Import license for ItemX managed: License Info."), + (conduct_cost_analysis, ("ItemY",), "Cost analysis for ItemY conducted."), + (evaluate_risk_factors, ("ItemZ",), "Risk factors for ItemZ evaluated."), + (manage_green_procurement_policy, ("Eco Policy",), "Green procurement policy managed: Eco Policy."), + (update_supplier_database, ("SupplierM", "New Info"), "Supplier database updated for SupplierM: New Info."), + (handle_dispute_resolution, ("VendorP", "Late delivery"), "Dispute with vendor VendorP over issue 'Late delivery' resolved."), + (assess_compliance, ("ItemQ", "ISO standards"), "Compliance of ItemQ with standards 'ISO standards' assessed."), + (manage_reverse_logistics, ("ItemR", 5), "Reverse logistics managed for 5 units of ItemR."), + (verify_delivery, ("ItemS", "Delivered"), "Delivery status of ItemS verified as Delivered."), + (handle_procurement_risk_assessment, ("Risk details",), "Procurement risk assessment handled: Risk details."), + (manage_supplier_contract, ("VendorT", "renewed"), "Supplier contract with VendorT has been renewed."), + (allocate_budget, ("DeptX", 1500.0), "Allocated budget of $1500.00 to DeptX."), + (track_procurement_metrics, ("Metric1",), "Procurement metric 'Metric1' tracked."), + (manage_inventory_levels, ("ItemU", "increased"), "Inventory levels for ItemU have been increased."), + (conduct_supplier_survey, ("SupplierV",), "Survey of supplier SupplierV conducted."), + ], +) +async def test_procurement_functions(func, args, expected): + result = await func(*args) + # For get_procurement_information, check for substring instead of full equality. + if func.__name__ == "get_procurement_information": + assert expected in result + else: + assert result == expected + + +# --- Test get_procurement_tools --- +def test_get_procurement_tools(): + tools = get_procurement_tools() + from autogen_core.components.tools import FunctionTool + assert isinstance(tools, list) + assert len(tools) > 0 + assert any(isinstance(tool, FunctionTool) for tool in tools) + names = [tool.name for tool in tools] + # Check that one of the expected tool names is present. + assert "order_hardware" in names diff --git a/src/backend/tests/agents/test_product.py b/src/backend/tests/agents/test_product.py index 4437cd751..299eaedbf 100644 --- a/src/backend/tests/agents/test_product.py +++ b/src/backend/tests/agents/test_product.py @@ -1,12 +1,18 @@ import os import sys from unittest.mock import MagicMock -import pytest -# Mock Azure SDK dependencies +sys.modules["azure.monitor.events"] = MagicMock() sys.modules["azure.monitor.events.extension"] = MagicMock() -# Set up environment variables + +import pytest + + +# Adjust sys.path so that the project root is found. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) + +# Set required environment variables before importing modules that depend on them. os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -15,12 +21,13 @@ os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" - -# Import the required functions for testing +# Import product functions and classes. from src.backend.agents.product import ( add_mobile_extras_pack, get_product_info, + get_billing_date, update_inventory, + add_new_product, schedule_product_launch, analyze_sales_data, get_customer_feedback, @@ -33,33 +40,62 @@ manage_supply_chain, forecast_product_demand, handle_product_complaints, - monitor_market_trends, generate_product_report, develop_new_product_ideas, optimize_product_page, track_product_shipment, + coordinate_with_marketing, + review_product_quality, + collaborate_with_tech_team, + update_product_description, + manage_product_returns, + conduct_product_survey, + update_product_specifications, + organize_product_photoshoot, + manage_product_listing, + set_product_availability, + coordinate_with_logistics, + calculate_product_margin, + update_product_category, + manage_product_bundles, + monitor_product_performance, + handle_product_pricing, + develop_product_training_material, + update_product_labels, + manage_product_warranty, + handle_product_licensing, + manage_product_packaging, + set_product_safety_standards, + develop_product_features, evaluate_product_performance, + manage_custom_product_orders, + update_product_images, + handle_product_obsolescence, + manage_product_sku, + provide_product_training, + get_product_tools, ) +from autogen_core.components.tools import FunctionTool + -# Parameterized tests for repetitive cases @pytest.mark.asyncio @pytest.mark.parametrize( "function, args, expected_substrings", [ - (add_mobile_extras_pack, ("Roaming Pack", "2025-01-01"), ["Roaming Pack", "2025-01-01"]), + (add_mobile_extras_pack, ("Roaming Pack", "2025-01-01"), ["Roaming Pack", "2025-01-01", "AGENT SUMMARY:"]), (get_product_info, (), ["Simulated Phone Plans", "Plan A"]), (update_inventory, ("Product A", 50), ["Inventory for", "Product A"]), (schedule_product_launch, ("New Product", "2025-02-01"), ["New Product", "2025-02-01"]), (analyze_sales_data, ("Product B", "Last Quarter"), ["Sales data for", "Product B"]), (get_customer_feedback, ("Product C",), ["Customer feedback for", "Product C"]), (manage_promotions, ("Product A", "10% off for summer"), ["Promotion for", "Product A"]), - (handle_product_recall, ("Product A", "Defective batch"), ["Product recall for", "Defective batch"]), - (set_product_discount, ("Product A", 15.0), ["Discount for", "15.0%"]), - (manage_supply_chain, ("Product A", "Supplier X"), ["Supply chain for", "Supplier X"]), (check_inventory, ("Product A",), ["Inventory status for", "Product A"]), (update_product_price, ("Product A", 99.99), ["Price for", "$99.99"]), (provide_product_recommendations, ("High Performance",), ["Product recommendations", "High Performance"]), + (handle_product_recall, ("Product A", "Defective batch"), ["Product recall for", "Defective batch"]), + (set_product_discount, ("Product A", 15.0), ["Discount for", "15.0%"]), + (manage_supply_chain, ("Product A", "Supplier X"), ["Supply chain for", "Supplier X"]), (forecast_product_demand, ("Product A", "Next Month"), ["Demand for", "Next Month"]), (handle_product_complaints, ("Product A", "Complaint about quality"), ["Complaint for", "Product A"]), (generate_product_report, ("Product A", "Sales"), ["Sales report for", "Product A"]), @@ -75,8 +111,54 @@ async def test_product_functions(function, args, expected_substrings): assert substring in result -# Specific test for monitoring market trends +# --- Extra parameterized tests for remaining functions --- @pytest.mark.asyncio -async def test_monitor_market_trends(): - result = await monitor_market_trends() - assert "Market trends monitored" in result +@pytest.mark.parametrize( + "function, args, expected_substrings", + [ + (get_billing_date, (), ["Billing Date"]), + (add_new_product, ("New smartwatch with health tracking.",), ["New Product Added", "New smartwatch"]), + (coordinate_with_marketing, ("Smartphone", "Campaign XYZ"), ["Marketing Coordination", "Campaign XYZ"]), + (review_product_quality, ("Monitor",), ["Quality review", "Monitor"]), + (collaborate_with_tech_team, ("Drone", "Improve battery efficiency"), ["Tech Team Collaboration", "Improve battery"]), + (update_product_description, ("Smartwatch", "Sleek design"), ["Product Description Updated", "Sleek design"]), + (manage_product_returns, ("Printer", "Paper jam"), ["Product Return Managed", "Paper jam"]), + (conduct_product_survey, ("Monitor", "Online survey"), ["Product Survey Conducted", "Online survey"]), + (update_product_specifications, ("TV", "1080p, 60Hz"), ["Product Specifications Updated", "1080p, 60Hz"]), + (organize_product_photoshoot, ("Camera", "2023-06-01"), ["Photoshoot Organized", "2023-06-01"]), + (manage_product_listing, ("Tablet", "Listed on Amazon"), ["Product Listing Managed", "Amazon"]), + (set_product_availability, ("Laptop", True), ["available"]), + (set_product_availability, ("Laptop", False), ["unavailable"]), + (coordinate_with_logistics, ("Speaker", "Pickup scheduled"), ["Logistics Coordination", "Pickup scheduled"]), + (calculate_product_margin, ("Laptop", 500, 1000), ["Profit margin", "50.00%"]), + (update_product_category, ("Phone", "Electronics"), ["Product Category Updated", "Electronics"]), + (manage_product_bundles, ("Bundle1", ["Phone", "Charger"]), ["Product Bundle Managed", "Phone", "Charger"]), + (monitor_product_performance, ("Camera",), ["Product Performance Monitored", "Camera"]), + (handle_product_pricing, ("TV", "Dynamic pricing"), ["Pricing Strategy Set", "Dynamic pricing"]), + (develop_product_training_material, ("Router", "Video tutorial"), ["Training Material Developed", "Video tutorial"]), + (update_product_labels, ("Smartphone", "New, Hot"), ["Product Labels Updated", "New, Hot"]), + (manage_product_warranty, ("Laptop", "2-year warranty"), ["Product Warranty Managed", "2-year warranty"]), + (handle_product_licensing, ("Software", "GPL License"), ["Product Licensing Handled", "GPL License"]), + (manage_product_packaging, ("Laptop", "Eco-friendly packaging"), ["Product Packaging Managed", "Eco-friendly packaging"]), + (set_product_safety_standards, ("Refrigerator", "ISO 9001"), ["Safety standards", "ISO 9001"]), + (develop_product_features, ("Smart TV", "Voice control, facial recognition"), ["New Features Developed", "Voice control"]), + (manage_custom_product_orders, ("Custom engraving required",), ["Custom Product Order Managed", "Custom engraving"]), + (update_product_images, ("Camera", ["http://example.com/img1.jpg", "http://example.com/img2.jpg"]), ["Product Images Updated", "img1.jpg", "img2.jpg"]), + (handle_product_obsolescence, ("DVD Player",), ["Product Obsolescence Handled", "DVD Player"]), + (manage_product_sku, ("Phone", "SKU12345"), ["SKU Managed", "SKU12345"]), + (provide_product_training, ("Tablet", "In-person training session"), ["Product Training Provided", "In-person training session"]), + ], +) +async def test_product_functions_extra(function, args, expected_substrings): + result = await function(*args) + for substring in expected_substrings: + assert substring in result + + +# --- Test get_product_tools --- +def test_get_product_tools(): + tools = get_product_tools() + assert isinstance(tools, list) + assert any(isinstance(tool, FunctionTool) for tool in tools) + names = [tool.name for tool in tools] + assert "add_mobile_extras_pack" in names or "get_product_info" in names diff --git a/src/backend/tests/context/test_cosmos_memory.py b/src/backend/tests/context/test_cosmos_memory.py index 441bb1ef1..9b6a75190 100644 --- a/src/backend/tests/context/test_cosmos_memory.py +++ b/src/backend/tests/context/test_cosmos_memory.py @@ -1,68 +1,351 @@ +import os +import sys +import asyncio import pytest -from unittest.mock import AsyncMock, patch -from azure.cosmos.partition_key import PartitionKey + +# Adjust sys.path so that the project root is found. +# Test file location: src/backend/tests/context/test_cosmos_memory.py +# Project root is three levels up. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))) + +# Set required environment variables before importing modules that depend on them. +os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" +os.environ["COSMOSDB_KEY"] = "mock-key" +os.environ["COSMOSDB_DATABASE"] = "mock-database" +os.environ["COSMOSDB_CONTAINER"] = "mock-container" +os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"] = "mock-deployment-name" +os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" +os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" + + from src.backend.context.cosmos_memory import CosmosBufferedChatCompletionContext +from src.backend.models.messages import BaseDataModel -# Helper to create async iterable -async def async_iterable(mock_items): - """Helper to create an async iterable.""" - for item in mock_items: - yield item +# --- DummyModel for Testing --- +class DummyModel(BaseDataModel): + id: str + session_id: str + data_type: str + user_id: str + def model_dump(self): + return { + "id": self.id, + "session_id": self.session_id, + "data_type": self.data_type, + "user_id": self.user_id, + } -@pytest.fixture -def mock_env_variables(monkeypatch): - """Mock all required environment variables.""" - env_vars = { - "COSMOSDB_ENDPOINT": "https://mock-endpoint", - "COSMOSDB_KEY": "mock-key", - "COSMOSDB_DATABASE": "mock-database", - "COSMOSDB_CONTAINER": "mock-container", - "AZURE_OPENAI_DEPLOYMENT_NAME": "mock-deployment-name", - "AZURE_OPENAI_API_VERSION": "2023-01-01", - "AZURE_OPENAI_ENDPOINT": "https://mock-openai-endpoint", - } - for key, value in env_vars.items(): - monkeypatch.setenv(key, value) + @classmethod + def model_validate(cls, data): + return DummyModel( + id=data["id"], + session_id=data["session_id"], + data_type=data.get("data_type", ""), + user_id=data["user_id"], + ) -@pytest.fixture -def mock_cosmos_client(): - """Fixture for mocking Cosmos DB client and container.""" - mock_client = AsyncMock() - mock_container = AsyncMock() - mock_client.create_container_if_not_exists.return_value = mock_container - - # Mocking context methods - mock_context = AsyncMock() - mock_context.store_message = AsyncMock() - mock_context.retrieve_messages = AsyncMock( - return_value=async_iterable([{"id": "test_id", "content": "test_content"}]) - ) +# --- FakeContainer to simulate Cosmos DB behavior --- +class FakeContainer: + def __init__(self, items=None): + self.items = items if items is not None else [] + + async def create_item(self, body): + self.items.append(body) + return body + + async def upsert_item(self, body): + self.items = [item for item in self.items if item.get("id") != body.get("id")] + self.items.append(body) + return body + + async def read_item(self, item, partition_key): + for doc in self.items: + if doc.get("id") == item and doc.get("session_id") == partition_key: + return doc + raise Exception("Item not found") + + def query_items(self, query, parameters, **kwargs): + async def gen(): + for item in self.items: + yield item + return gen() - return mock_client, mock_container, mock_context + async def delete_item(self, item, partition_key): + self.items = [doc for doc in self.items if doc.get("id") != item] + return +# --- Fixture: cosmos_context --- +# We define this as a normal (synchronous) fixture so that it returns an actual instance. @pytest.fixture -def mock_config(mock_cosmos_client): - """Fixture to patch Config with mock Cosmos DB client.""" - mock_client, _, _ = mock_cosmos_client - with patch( - "src.backend.config.Config.GetCosmosDatabaseClient", return_value=mock_client - ), patch("src.backend.config.Config.COSMOSDB_CONTAINER", "mock-container"): - yield - - -@pytest.mark.asyncio -async def test_initialize(mock_config, mock_cosmos_client): - """Test if the Cosmos DB container is initialized correctly.""" - mock_client, mock_container, _ = mock_cosmos_client - context = CosmosBufferedChatCompletionContext( - session_id="test_session", user_id="test_user" - ) - await context.initialize() - mock_client.create_container_if_not_exists.assert_called_once_with( - id="mock-container", partition_key=PartitionKey(path="/session_id") +def cosmos_context(monkeypatch): + # Patch asyncio.create_task to a no-op so that __init__ does not schedule initialize(). + monkeypatch.setattr(asyncio, "create_task", lambda coro, **kwargs: None) + ctx = CosmosBufferedChatCompletionContext("test_session", "test_user", buffer_size=10) + fake_container = FakeContainer() + ctx._container = fake_container + # Manually set the initialization event. + ctx._initialized.set() + return ctx + + +# Mark all tests in this module as async tests. +pytestmark = pytest.mark.asyncio + + +async def test_initialize(monkeypatch): + """Test that initialize() creates the container and sets the event.""" + fake_container = FakeContainer() + + async def fake_create_container_if_not_exists(id, partition_key): + return fake_container + monkeypatch.setattr( + "src.backend.context.cosmos_memory.Config.GetCosmosDatabaseClient", + lambda: type("FakeDB", (), {"create_container_if_not_exists": fake_create_container_if_not_exists}) ) - assert context._container == mock_container + monkeypatch.setattr("src.backend.context.cosmos_memory.Config.COSMOSDB_CONTAINER", "mock-container") + # For this test, let asyncio.create_task schedule normally. + monkeypatch.setattr(asyncio, "create_task", lambda coro, **kwargs: asyncio.get_running_loop().create_task(coro)) + ctx = CosmosBufferedChatCompletionContext("s", "u", buffer_size=10) + await ctx.initialize() + assert ctx._container is fake_container + + +async def test_add_item_success(cosmos_context): + dummy = DummyModel(id="dummy1", session_id="test_session", data_type="plan", user_id="test_user") + await cosmos_context.add_item(dummy) + assert any(item["id"] == "dummy1" for item in cosmos_context._container.items) + + +async def test_add_item_failure(cosmos_context, monkeypatch): + dummy = DummyModel(id="dummy2", session_id="test_session", data_type="plan", user_id="test_user") + + async def fake_create_item(body): + raise Exception("failure") + monkeypatch.setattr(cosmos_context._container, "create_item", fake_create_item) + # Exception is caught internally; no exception propagates. + await cosmos_context.add_item(dummy) + + +async def test_update_item_success(cosmos_context): + dummy = DummyModel(id="dummy3", session_id="test_session", data_type="plan", user_id="test_user") + await cosmos_context.update_item(dummy) + assert any(item["id"] == "dummy3" for item in cosmos_context._container.items) + + +async def test_update_item_failure(cosmos_context, monkeypatch): + dummy = DummyModel(id="dummy4", session_id="test_session", data_type="plan", user_id="test_user") + + async def fake_upsert_item(body): + raise Exception("failure") + monkeypatch.setattr(cosmos_context._container, "upsert_item", fake_upsert_item) + await cosmos_context.update_item(dummy) + + +async def test_get_item_by_id_success(cosmos_context): + doc = {"id": "exists", "session_id": "test_partition", "data_type": "plan", "user_id": "test"} + cosmos_context._container.items.append(doc) + item = await cosmos_context.get_item_by_id("exists", "test_partition", DummyModel) + assert item is not None + assert item.id == "exists" + + +async def test_get_item_by_id_failure(cosmos_context): + item = await cosmos_context.get_item_by_id("nonexistent", "test_partition", DummyModel) + assert item is None + + +async def test_query_items_failure(cosmos_context, monkeypatch): + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + result = await cosmos_context.query_items("dummy", [{"name": "param", "value": "val"}], DummyModel) + assert result == [] + + +async def test_add_session(cosmos_context): + session = DummyModel(id="sess1", session_id="test_session", data_type="session", user_id="test_user") + await cosmos_context.add_session(session) + assert any(item["id"] == "sess1" for item in cosmos_context._container.items) + + +async def test_get_session_not_found(cosmos_context, monkeypatch): + async def empty_gen(): + if False: + yield {} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: empty_gen()) + session = await cosmos_context.get_session("nonexistent") + assert session is None + + +async def test_add_plan(cosmos_context): + plan = DummyModel(id="plan1", session_id="test_session", data_type="plan", user_id="test_user") + await cosmos_context.add_plan(plan) + assert any(item["id"] == "plan1" for item in cosmos_context._container.items) + + +async def test_update_plan(cosmos_context): + plan = DummyModel(id="plan1", session_id="test_session", data_type="plan", user_id="test_user") + await cosmos_context.update_plan(plan) + assert any(item["id"] == "plan1" for item in cosmos_context._container.items) + + +async def test_add_step(cosmos_context): + step = DummyModel(id="step1", session_id="test_session", data_type="step", user_id="test_user") + await cosmos_context.add_step(step) + assert any(item["id"] == "step1" for item in cosmos_context._container.items) + + +async def test_update_step(cosmos_context): + step = DummyModel(id="step1", session_id="test_session", data_type="step", user_id="test_user") + await cosmos_context.update_step(step) + assert any(item["id"] == "step1" for item in cosmos_context._container.items) + + +# --- Tests for Messages Methods --- +class DummyLLMMessage: + def dict(self): + return {"type": "UserMessage", "content": "hello"} + + +async def test_get_messages_failure(cosmos_context, monkeypatch): + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + messages = await cosmos_context.get_messages() + assert messages == [] + + +async def test_get_data_by_type_failure(cosmos_context, monkeypatch): + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + data = await cosmos_context.get_data_by_type("plan") + assert data == [] + + +# --- Utility Methods Tests --- +async def test_delete_item(cosmos_context): + cosmos_context._container.items.append({"id": "del1", "session_id": "test_session"}) + await cosmos_context.delete_item("del1", "test_session") + assert not any(item["id"] == "del1" for item in cosmos_context._container.items) + + +async def test_delete_items_by_query(cosmos_context, monkeypatch): + async def gen(): + yield {"id": "del1", "session_id": "test_session"} + yield {"id": "del2", "session_id": "test_session"} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: gen()) + calls = [] + + async def fake_delete_item(item, partition_key): + calls.append((item, partition_key)) + monkeypatch.setattr(cosmos_context._container, "delete_item", fake_delete_item) + await cosmos_context.delete_items_by_query("query", [{"name": "param", "value": "val"}]) + assert len(calls) == 2 + + +async def test_delete_all_messages(cosmos_context, monkeypatch): + async def gen(): + yield {"id": "msg1", "session_id": "test_session"} + yield {"id": "msg2", "session_id": "test_session"} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: gen()) + calls = [] + + async def fake_delete_item(item, partition_key): + calls.append((item, partition_key)) + monkeypatch.setattr(cosmos_context._container, "delete_item", fake_delete_item) + await cosmos_context.delete_all_messages("message") + assert len(calls) == 2 + + +async def test_get_all_messages_failure(cosmos_context, monkeypatch): + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + messages = await cosmos_context.get_all_messages() + assert messages == [] + + +async def test_close(cosmos_context): + await cosmos_context.close() + + +async def test_context_manager(cosmos_context): + async with cosmos_context as ctx: + assert ctx == cosmos_context + + +async def test_get_all_sessions_failure(cosmos_context, monkeypatch): + """Simulate an exception during query_items in get_all_sessions, which should return an empty list.""" + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + sessions = await cosmos_context.get_all_sessions() + assert sessions == [] + + +async def test_get_plan_by_session_not_found(cosmos_context, monkeypatch): + """Simulate query_items returning no plans, so get_plan_by_session returns None.""" + async def empty_gen(): + if False: + yield {} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: empty_gen()) + plan = await cosmos_context.get_plan_by_session("nonexistent") + assert plan is None + + +async def test_get_all_plans_failure(cosmos_context, monkeypatch): + """Simulate exception in query_items when calling get_all_plans; should return empty list.""" + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + plans = await cosmos_context.get_all_plans() + assert plans == [] + + +async def test_get_messages_unrecognized(cosmos_context, monkeypatch): + """Test get_messages() when an item has an unrecognized message type so it is skipped.""" + async def gen(): + yield {"id": "msg_unknown", "session_id": "test_session", "data_type": "message", + "content": {"type": "UnknownType", "content": "ignored"}, "_ts": 50} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: gen()) + messages = await cosmos_context.get_messages() + # Since the type is unknown, the message should be skipped. + assert messages == [] + + +async def test_delete_item_failure(cosmos_context, monkeypatch): + """Simulate an exception in delete_item so that delete_item() logs and does not propagate.""" + async def fake_delete_item(item, partition_key): + raise Exception("delete failure") + monkeypatch.setattr(cosmos_context._container, "delete_item", fake_delete_item) + # Calling delete_item should not raise; it catches exception internally. + await cosmos_context.delete_item("any", "any") + + +async def test_delete_items_by_query_failure(cosmos_context, monkeypatch): + """Simulate an exception in query_items within delete_items_by_query and ensure it is caught.""" + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + # delete_items_by_query should catch the exception and not propagate. + await cosmos_context.delete_items_by_query("query", [{"name": "param", "value": "val"}]) + + +async def test_get_all_messages_success(cosmos_context, monkeypatch): + async def gen(): + yield {"id": "msg1", "session_id": "test_session", "data_type": "message", "content": "hello", "_ts": 40} + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: gen()) + messages = await cosmos_context.get_all_messages() + assert len(messages) == 1 + assert messages[0]["id"] == "msg1" + + +async def test_get_all_messages_exception(cosmos_context, monkeypatch): + monkeypatch.setattr(cosmos_context._container, "query_items", + lambda query, parameters, **kwargs: (_ for _ in ()).throw(Exception("fail"))) + messages = await cosmos_context.get_all_messages() + assert messages == [] diff --git a/src/backend/tests/test_app.py b/src/backend/tests/test_app.py index 04b57c7d5..808791c73 100644 --- a/src/backend/tests/test_app.py +++ b/src/backend/tests/test_app.py @@ -1,15 +1,17 @@ import os import sys -from unittest.mock import MagicMock, patch import pytest +from unittest.mock import MagicMock, patch from fastapi.testclient import TestClient -# Mock Azure dependencies to prevent import errors + +# --- MOCK EXTERNAL DEPENDENCIES --- +# Prevent import errors for Azure modules. sys.modules["azure.monitor"] = MagicMock() sys.modules["azure.monitor.events.extension"] = MagicMock() sys.modules["azure.monitor.opentelemetry"] = MagicMock() -# Mock environment variables before importing app +# Set required environment variables os.environ["COSMOSDB_ENDPOINT"] = "https://mock-endpoint" os.environ["COSMOSDB_KEY"] = "mock-key" os.environ["COSMOSDB_DATABASE"] = "mock-database" @@ -21,68 +23,302 @@ os.environ["AZURE_OPENAI_API_VERSION"] = "2023-01-01" os.environ["AZURE_OPENAI_ENDPOINT"] = "https://mock-openai-endpoint" -# Mock telemetry initialization to prevent errors +# Prevent telemetry initialization errors with patch("azure.monitor.opentelemetry.configure_azure_monitor", MagicMock()): from src.backend.app import app -# Initialize FastAPI test client client = TestClient(app) +class FakePlan: + id = "fake_plan_id" + summary = "Fake plan summary" + + +class FakeRuntime: + async def send_message(self, message, agent_id): + return FakePlan() + + +# Allow any arguments so that both (session_id, user_id) and keyword usage work. +async def fake_initialize_runtime_and_context(*args, **kwargs): + return FakeRuntime(), None + + +# Our Fake Cosmos returns dictionaries that fully satisfy our Pydantic models. +class FakeCosmos: + def __init__(self, session_id: str, user_id: str): + self.session_id = session_id + self.user_id = user_id + + async def get_plan_by_session(self, session_id: str): + if session_id == "existing": + user_id = self.user_id # capture from the outer instance + + class FakePlanBySession: + id = "existing_plan_id" + + def model_dump(inner_self): + return { + "id": inner_self.id, + "session_id": session_id, + "initial_goal": "Test goal", + "overall_status": "in_progress", + "user_id": user_id, + } + return FakePlanBySession() + return None + + async def get_steps_by_plan(self, plan_id: str): + return [{ + "id": "step1", + "plan_id": plan_id, + "action": "Test action", + "agent": "TechSupportAgent", # Allowed enum value + "status": "planned", + "session_id": self.session_id, + "user_id": self.user_id, + }] + + async def get_all_plans(self): + user_id = self.user_id + + class FakePlanAll: + id = "plan1" + + def model_dump(inner_self): + return { + "id": inner_self.id, + "session_id": "sess1", + "initial_goal": "Goal1", + "overall_status": "completed", + "user_id": user_id, + } + return [FakePlanAll()] + + async def get_data_by_type(self, type_str: str): + return [{ + "id": "agent_msg1", + "session_id": self.session_id, + "plan_id": "plan1", + "content": "Fake agent message", + "source": "TechSupportAgent", + "ts": 123456789, + "step_id": "step1", + "user_id": self.user_id, + }] + + async def delete_all_messages(self, type_str: str): + return + + async def get_all_messages(self): + return [{ + "id": "msg1", + "data_type": "plan", + "session_id": "sess1", + "user_id": self.user_id, + "content": "Test content", + "ts": 123456789, + }] + + @pytest.fixture(autouse=True) -def mock_dependencies(monkeypatch): - """Mock dependencies to simplify tests.""" +def override_dependencies(monkeypatch): + # Override authentication so that the headers always yield a valid user. monkeypatch.setattr( "src.backend.auth.auth_utils.get_authenticated_user_details", lambda headers: {"user_principal_id": "mock-user-id"}, ) + monkeypatch.setattr( "src.backend.utils.retrieve_all_agent_tools", - lambda: [{"agent": "test_agent", "function": "test_function"}], + lambda: [{ + "agent": "TechSupportAgent", + "function": "test_function", + "description": "desc", + "arguments": "args" + }], ) + monkeypatch.setattr("src.backend.app.initialize_runtime_and_context", fake_initialize_runtime_and_context) + monkeypatch.setattr("src.backend.app.CosmosBufferedChatCompletionContext", FakeCosmos) + monkeypatch.setattr("src.backend.app.track_event_if_configured", lambda event, props: None) def test_input_task_invalid_json(): - """Test the case where the input JSON is invalid.""" invalid_json = "Invalid JSON data" - headers = {"Authorization": "Bearer mock-token"} response = client.post("/input_task", data=invalid_json, headers=headers) - - # Assert response for invalid JSON assert response.status_code == 422 assert "detail" in response.json() def test_input_task_missing_description(): - """Test the case where the input task description is missing.""" - input_task = { - "session_id": None, - "user_id": "mock-user-id", + payload = {"session_id": ""} + headers = {"Authorization": "Bearer mock-token"} + response = client.post("/input_task", json=payload, headers=headers) + assert response.status_code == 422 + assert "detail" in response.json() + + +def test_human_feedback_valid(): + payload = { + "step_id": "step1", + "plan_id": "plan1", + "session_id": "sess1", + "approved": True, + "human_feedback": "Feedback text", + "updated_action": "No change" + } + headers = {"Authorization": "Bearer mock-token"} + response = client.post("/human_feedback", json=payload, headers=headers) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "Feedback received" + assert data["session_id"] == payload["session_id"] + assert data["step_id"] == payload["step_id"] + + +def test_human_clarification_valid(): + payload = { + "plan_id": "plan1", + "session_id": "sess1", + "human_clarification": "Clarification details" } + headers = {"Authorization": "Bearer mock-token"} + response = client.post("/human_clarification_on_plan", json=payload, headers=headers) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "Clarification received" + assert data["session_id"] == payload["session_id"] + +def test_approve_step_with_step_id(): + payload = { + "step_id": "step1", + "plan_id": "plan1", + "session_id": "sess1", + "approved": True, + "human_feedback": "Approved", + "updated_action": "None" + } headers = {"Authorization": "Bearer mock-token"} - response = client.post("/input_task", json=input_task, headers=headers) + response = client.post("/approve_step_or_steps", json=payload, headers=headers) + assert response.status_code == 200 + data = response.json() + assert "Step step1" in data["status"] + + +def test_approve_all_steps(): + payload = { + "step_id": "", + "plan_id": "plan1", + "session_id": "sess1", + "approved": True, + "human_feedback": "All approved", + "updated_action": "None" + } + headers = {"Authorization": "Bearer mock-token"} + response = client.post("/approve_step_or_steps", json=payload, headers=headers) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "All steps approved" + + +def test_get_plans_with_session(): + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/plans", params={"session_id": "existing"}, headers=headers) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + plan = data[0] + assert plan["id"] == "existing_plan_id" + assert "steps" in plan + + +def test_get_plans_without_session(): + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/plans", headers=headers) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + plan = data[0] + assert plan["id"] == "plan1" + assert "steps" in plan - # Assert response for missing description - assert response.status_code == 422 - assert "detail" in response.json() + +def test_get_steps_by_plan(): + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/steps/plan1", headers=headers) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert data[0]["plan_id"] == "plan1" + + +def test_get_agent_messages(): + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/agent_messages/sess1", headers=headers) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert data[0]["session_id"] == "sess1" + + +def test_delete_all_messages(): + headers = {"Authorization": "Bearer mock-token"} + response = client.delete("/messages", headers=headers) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "All messages deleted" + + +def test_get_all_messages(): + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/messages", headers=headers) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert data[0]["data_type"] == "plan" + + +def test_get_agent_tools(): + response = client.get("/api/agent-tools") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + # Our override now returns "TechSupportAgent" + assert data[0]["agent"] == "TechSupportAgent" def test_basic_endpoint(): - """Test a basic endpoint to ensure the app runs.""" response = client.get("/") - assert response.status_code == 404 # The root endpoint is not defined + assert response.status_code == 404 -def test_input_task_empty_description(): - """Tests if /input_task handles an empty description.""" - empty_task = {"session_id": None, "user_id": "mock-user-id", "description": ""} +def test_input_task_rai_failure(monkeypatch): + """ + Test the /input_task endpoint when the RAI check fails. + The endpoint should print "RAI failed", track the event, and return {"status": "Plan not created"}. + """ + # Override rai_success to return False + monkeypatch.setattr("src.backend.app.rai_success", lambda description: False) + payload = {"session_id": "", "description": "This should fail RAI"} headers = {"Authorization": "Bearer mock-token"} - response = client.post("/input_task", json=empty_task, headers=headers) + response = client.post("/input_task", json=payload, headers=headers) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "Plan not created" - assert response.status_code == 422 - assert "detail" in response.json() # Assert error message for missing description + +def test_get_plans_not_found(): + """ + Test the /plans endpoint when a session_id is provided that does not exist. + Expect a 404 error with detail "Plan not found". + """ + headers = {"Authorization": "Bearer mock-token"} + response = client.get("/plans", params={"session_id": "nonexistent"}, headers=headers) + assert response.status_code == 404 + assert response.json()["detail"] == "Plan not found" if __name__ == "__main__":