diff --git a/requirements.txt b/requirements.txt index 6ca5619c..eb768781 100644 --- a/requirements.txt +++ b/requirements.txt @@ -97,3 +97,4 @@ wheel==0.45.1 wrapt==1.17.2 yarl==1.18.3 zstandard==0.23.0 +tensorflow==2.20.0 diff --git a/tests/unit/memory/test_memory_saver.py b/tests/unit/memory/test_memory_saver.py new file mode 100644 index 00000000..2f6d4142 --- /dev/null +++ b/tests/unit/memory/test_memory_saver.py @@ -0,0 +1,158 @@ +import pytest +from app.bot.memory import MemorySaverInMemory +from app.bot.memory.models import State +from app.bot.dialogue_manager.models import UserMessage + + +class TestMemorySaverInMemory: + @pytest.fixture + def memory_saver(self): + """Fixture to create a fresh MemorySaverInMemory instance.""" + return MemorySaverInMemory() + + @pytest.fixture + def sample_state(self): + """Fixture to create a sample State object.""" + user_message = UserMessage( + thread_id="thread_123", text="Hello", context={"user_id": "123"} + ) + return State( + thread_id="thread_123", + user_message=user_message, + bot_message=[{"text": "Hi there!"}], + context={"user_id": "123"}, + intent={"name": "greet", "confidence": 0.9}, + parameters=[{"name": "location", "value": "New York"}], + extracted_parameters={"location": "New York"}, + missing_parameters=[], + complete=True, + current_node="greet_node", + ) + + @pytest.mark.asyncio + async def test_init(self, memory_saver): + """Test MemorySaverInMemory initialization.""" + assert memory_saver.memory == {} + assert hasattr(memory_saver, "save") + assert hasattr(memory_saver, "get") + assert hasattr(memory_saver, "get_all") + + @pytest.mark.asyncio + async def test_init_state(self, memory_saver): + """Test init_state method.""" + thread_id = "test_thread" + state = await memory_saver.init_state(thread_id) + + assert isinstance(state, State) + assert state.thread_id == thread_id + assert state.user_message is None + assert state.bot_message is None + assert state.context == {} + assert state.intent == {} + assert state.parameters == [] + assert state.extracted_parameters == {} + assert state.missing_parameters == [] + assert state.complete is False + assert state.current_node == "" + + @pytest.mark.asyncio + async def test_save_new_thread(self, memory_saver, sample_state): + """Test saving state for a new thread.""" + await memory_saver.save(sample_state.thread_id, sample_state) + + assert sample_state.thread_id in memory_saver.memory + assert len(memory_saver.memory[sample_state.thread_id]) == 1 + assert memory_saver.memory[sample_state.thread_id][0] == sample_state + + @pytest.mark.asyncio + async def test_save_existing_thread(self, memory_saver, sample_state): + """Test saving multiple states for the same thread.""" + # Save first state + await memory_saver.save(sample_state.thread_id, sample_state) + + # Create and save second state + second_state = State( + thread_id=sample_state.thread_id, + user_message=UserMessage( + thread_id=sample_state.thread_id, + text="How are you?", + context={"user_id": "123"}, + ), + context={"user_id": "123"}, + ) + await memory_saver.save(sample_state.thread_id, second_state) + + assert len(memory_saver.memory[sample_state.thread_id]) == 2 + assert memory_saver.memory[sample_state.thread_id][0] == sample_state + assert memory_saver.memory[sample_state.thread_id][1] == second_state + + @pytest.mark.asyncio + async def test_get_existing_thread(self, memory_saver, sample_state): + """Test getting the latest state for an existing thread.""" + await memory_saver.save(sample_state.thread_id, sample_state) + + retrieved_state = await memory_saver.get(sample_state.thread_id) + + assert retrieved_state == sample_state + assert retrieved_state.thread_id == sample_state.thread_id + + @pytest.mark.asyncio + async def test_get_nonexistent_thread(self, memory_saver): + """Test getting state for a nonexistent thread.""" + retrieved_state = await memory_saver.get("nonexistent_thread") + + assert retrieved_state is None + + @pytest.mark.asyncio + async def test_get_all_existing_thread(self, memory_saver, sample_state): + """Test getting all states for an existing thread.""" + await memory_saver.save(sample_state.thread_id, sample_state) + + # Add another state + second_state = State(thread_id=sample_state.thread_id) + await memory_saver.save(sample_state.thread_id, second_state) + + all_states = await memory_saver.get_all(sample_state.thread_id) + + assert len(all_states) == 2 + assert all_states[0] == sample_state + assert all_states[1] == second_state + + @pytest.mark.asyncio + async def test_get_all_nonexistent_thread(self, memory_saver): + """Test getting all states for a nonexistent thread.""" + all_states = await memory_saver.get_all("nonexistent_thread") + + assert all_states == [] + + @pytest.mark.asyncio + async def test_multiple_threads_isolation(self, memory_saver, sample_state): + """Test that different threads maintain separate state.""" + # Create states for different threads + thread1_state = sample_state + thread2_state = State( + thread_id="thread_456", + user_message=UserMessage( + thread_id="thread_456", text="Goodbye", context={"user_id": "456"} + ), + ) + + await memory_saver.save(thread1_state.thread_id, thread1_state) + await memory_saver.save(thread2_state.thread_id, thread2_state) + + # Verify isolation + thread1_retrieved = await memory_saver.get(thread1_state.thread_id) + thread2_retrieved = await memory_saver.get(thread2_state.thread_id) + + assert thread1_retrieved == thread1_state + assert thread2_retrieved == thread2_state + assert thread1_retrieved != thread2_retrieved + + # Verify get_all isolation + thread1_all = await memory_saver.get_all(thread1_state.thread_id) + thread2_all = await memory_saver.get_all(thread2_state.thread_id) + + assert len(thread1_all) == 1 + assert len(thread2_all) == 1 + assert thread1_all[0] == thread1_state + assert thread2_all[0] == thread2_state diff --git a/tests/unit/memory/test_memory_saver_mongo.py b/tests/unit/memory/test_memory_saver_mongo.py new file mode 100644 index 00000000..dd5d3080 --- /dev/null +++ b/tests/unit/memory/test_memory_saver_mongo.py @@ -0,0 +1,222 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from motor.motor_asyncio import AsyncIOMotorClient +from app.bot.memory.memory_saver_mongo import MemorySaverMongo +from app.bot.memory.models import State +from app.bot.dialogue_manager.models import UserMessage + + +class TestMemorySaverMongo: + @pytest.fixture + def mock_client(self): + """Fixture to create a mocked MongoDB client.""" + client = MagicMock(spec=AsyncIOMotorClient) + db = MagicMock() + collection = MagicMock() + + client.get_database.return_value = db + db.get_collection.return_value = collection + + return client + + @pytest.fixture + def memory_saver(self, mock_client): + """Fixture to create a MemorySaverMongo instance with mocked client.""" + return MemorySaverMongo(mock_client) + + @pytest.fixture + def sample_state(self): + """Fixture to create a sample State object.""" + user_message = UserMessage( + thread_id="thread_123", text="Hello", context={"user_id": "123"} + ) + return State( + thread_id="thread_123", + user_message=user_message, + bot_message=[{"text": "Hi there!"}], + context={"user_id": "123"}, + intent={"name": "greet", "confidence": 0.9}, + parameters=[{"name": "location", "value": "New York"}], + extracted_parameters={"location": "New York"}, + missing_parameters=[], + complete=True, + current_node="greet_node", + ) + + def test_init(self, mock_client): + """Test MemorySaverMongo initialization.""" + saver = MemorySaverMongo(mock_client) + + assert saver.client == mock_client + mock_client.get_database.assert_called_once_with("chatbot") + mock_client.get_database.return_value.get_collection.assert_called_once_with( + "state" + ) + + @pytest.mark.asyncio + async def test_save(self, memory_saver, sample_state): + """Test saving state to MongoDB.""" + # Mock the insert_one method + memory_saver.collection.insert_one = AsyncMock() + + await memory_saver.save(sample_state.thread_id, sample_state) + + memory_saver.collection.insert_one.assert_called_once() + call_args = memory_saver.collection.insert_one.call_args[0][0] + + # Verify the document structure + assert call_args["thread_id"] == sample_state.thread_id + assert "user_message" in call_args + assert "bot_message" in call_args + assert call_args["context"] == sample_state.context + assert call_args["intent"] == sample_state.intent + assert call_args["parameters"] == sample_state.parameters + assert call_args["complete"] == sample_state.complete + assert call_args["current_node"] == sample_state.current_node + + @pytest.mark.asyncio + async def test_get_existing_state(self, memory_saver, sample_state): + """Test getting existing state from MongoDB.""" + # Mock the find_one method to return state data + mock_result = { + "thread_id": sample_state.thread_id, + "context": sample_state.context, + "intent": sample_state.intent, + "parameters": sample_state.parameters, + "extracted_parameters": sample_state.extracted_parameters, + "missing_parameters": sample_state.missing_parameters, + "complete": sample_state.complete, + "current_node": sample_state.current_node, + } + memory_saver.collection.find_one = AsyncMock(return_value=mock_result) + + result = await memory_saver.get(sample_state.thread_id) + + assert isinstance(result, State) + assert result.thread_id == sample_state.thread_id + assert result.context == sample_state.context + assert result.intent == sample_state.intent + assert result.parameters == sample_state.parameters + assert result.complete == sample_state.complete + assert result.current_node == sample_state.current_node + + # Verify the query parameters + memory_saver.collection.find_one.assert_called_once() + call_args = memory_saver.collection.find_one.call_args + assert call_args[0][0] == {"thread_id": sample_state.thread_id} + assert call_args[0][1] == { + "_id": 0, + "nlu": 0, + "date": 0, + "user_message": 0, + "bot_message": 0, + } + assert call_args[1] == {"sort": [("$natural", -1)]} + + @pytest.mark.asyncio + async def test_get_nonexistent_state(self, memory_saver): + """Test getting state that doesn't exist in MongoDB.""" + memory_saver.collection.find_one = AsyncMock(return_value=None) + + result = await memory_saver.get("nonexistent_thread") + + assert result is None + memory_saver.collection.find_one.assert_called_once() + + @pytest.mark.asyncio + async def test_get_all_states(self, memory_saver, sample_state): + """Test getting all states for a thread from MongoDB.""" + mock_results = [ + { + "thread_id": sample_state.thread_id, + "context": sample_state.context, + "intent": sample_state.intent, + "parameters": sample_state.parameters, + "extracted_parameters": sample_state.extracted_parameters, + "missing_parameters": sample_state.missing_parameters, + "complete": sample_state.complete, + "current_node": sample_state.current_node, + }, + { + "thread_id": sample_state.thread_id, + "context": {"user_id": "456"}, + "intent": {"name": "bye", "confidence": 0.8}, + "parameters": [], + "extracted_parameters": {}, + "missing_parameters": [], + "complete": False, + "current_node": "bye_node", + }, + ] + + # Mock the find method to return an async cursor + mock_cursor = MagicMock() + mock_cursor.to_list = AsyncMock(return_value=mock_results) + memory_saver.collection.find = MagicMock(return_value=mock_cursor) + + results = await memory_saver.get_all(sample_state.thread_id) + + assert len(results) == 2 + assert all(isinstance(state, State) for state in results) + assert results[0].thread_id == sample_state.thread_id + assert results[1].thread_id == sample_state.thread_id + + # Verify the query + memory_saver.collection.find.assert_called_once_with( + {"thread_id": sample_state.thread_id}, sort=[("$natural", -1)] + ) + + @pytest.mark.asyncio + async def test_get_all_empty_results(self, memory_saver): + """Test getting all states when no states exist.""" + mock_cursor = MagicMock() + mock_cursor.to_list = AsyncMock(return_value=[]) + memory_saver.collection.find = MagicMock(return_value=mock_cursor) + + results = await memory_saver.get_all("empty_thread") + + assert results == [] + memory_saver.collection.find.assert_called_once() + + @pytest.mark.asyncio + async def test_init_state(self, memory_saver): + """Test init_state method (inherited from base class).""" + thread_id = "test_thread" + state = await memory_saver.init_state(thread_id) + + assert isinstance(state, State) + assert state.thread_id == thread_id + assert state.user_message is None + assert state.bot_message is None + assert state.context == {} + assert state.intent == {} + assert state.parameters == [] + assert state.extracted_parameters == {} + assert state.missing_parameters == [] + assert state.complete is False + assert state.current_node == "" + + @pytest.mark.asyncio + async def test_database_error_handling(self, memory_saver, sample_state): + """Test error handling for database operations.""" + # Test save error + memory_saver.collection.insert_one = AsyncMock( + side_effect=Exception("DB Error") + ) + + with pytest.raises(Exception, match="DB Error"): + await memory_saver.save(sample_state.thread_id, sample_state) + + # Test get error + memory_saver.collection.find_one = AsyncMock(side_effect=Exception("DB Error")) + + with pytest.raises(Exception, match="DB Error"): + await memory_saver.get("thread_123") + + # Test get_all error + mock_cursor = MagicMock() + mock_cursor.to_list = AsyncMock(side_effect=Exception("DB Error")) + memory_saver.collection.find = MagicMock(return_value=mock_cursor) + + with pytest.raises(Exception, match="DB Error"): + await memory_saver.get_all("thread_123") diff --git a/tests/unit/memory/test_models.py b/tests/unit/memory/test_models.py new file mode 100644 index 00000000..143d52c0 --- /dev/null +++ b/tests/unit/memory/test_models.py @@ -0,0 +1,281 @@ +import pytest +from datetime import datetime, UTC +from unittest.mock import MagicMock +from app.bot.memory.models import State +from app.bot.dialogue_manager.models import UserMessage + + +class TestState: + @pytest.fixture + def sample_user_message(self): + """Fixture to create a sample UserMessage.""" + return UserMessage( + thread_id="thread_123", + text="Hello, I want to book a flight", + context={"user_id": "user123", "session_id": "session456"}, + ) + + @pytest.fixture + def sample_state(self, sample_user_message): + """Fixture to create a sample State object.""" + return State( + thread_id="thread_123", + user_message=sample_user_message, + bot_message=[{"text": "I'd be happy to help you book a flight!"}], + context={"user_id": "user123", "session_id": "session456"}, + intent={"name": "book_flight", "confidence": 0.95}, + parameters=[ + {"name": "destination", "value": "New York", "required": True}, + {"name": "departure_date", "required": True}, + ], + extracted_parameters={"destination": "New York"}, + missing_parameters=["departure_date"], + complete=False, + current_node="collect_departure_date", + ) + + def test_init_minimal(self): + """Test State initialization with minimal parameters.""" + thread_id = "test_thread" + state = State(thread_id=thread_id) + + assert state.thread_id == thread_id + assert state.user_message is None + assert state.bot_message is None + assert state.context == {} + assert state.intent == {} + assert state.parameters == [] + assert state.extracted_parameters == {} + assert state.missing_parameters == [] + assert state.complete is False + assert state.current_node == "" + assert isinstance(state.date, datetime) + + def test_init_full(self, sample_user_message): + """Test State initialization with all parameters.""" + thread_id = "thread_123" + custom_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) + + state = State( + thread_id=thread_id, + user_message=sample_user_message, + bot_message=[{"text": "Hello!"}], + context={"key": "value"}, + intent={"name": "test_intent"}, + parameters=[{"name": "param1"}], + extracted_parameters={"param1": "value1"}, + missing_parameters=["param2"], + complete=True, + current_node="test_node", + date=custom_date, + ) + + assert state.thread_id == thread_id + assert state.user_message == sample_user_message + assert state.bot_message == [{"text": "Hello!"}] + assert state.context == {"key": "value"} + assert state.intent == {"name": "test_intent"} + assert state.parameters == [{"name": "param1"}] + assert state.extracted_parameters == {"param1": "value1"} + assert state.missing_parameters == ["param2"] + assert state.complete is True + assert state.current_node == "test_node" + assert state.date == custom_date + + def test_to_dict(self, sample_state): + """Test converting State to dictionary.""" + state_dict = sample_state.to_dict() + + assert isinstance(state_dict, dict) + assert state_dict["thread_id"] == sample_state.thread_id + assert state_dict["context"] == sample_state.context + assert state_dict["intent"] == sample_state.intent + assert state_dict["parameters"] == sample_state.parameters + assert state_dict["extracted_parameters"] == sample_state.extracted_parameters + assert state_dict["missing_parameters"] == sample_state.missing_parameters + assert state_dict["complete"] == sample_state.complete + assert state_dict["current_node"] == sample_state.current_node + assert "date" in state_dict + assert "user_message" in state_dict + assert "bot_message" in state_dict + assert "nlu" in state_dict + + def test_from_dict_minimal(self): + """Test creating State from minimal dictionary.""" + state_dict = { + "thread_id": "thread_123", + "context": {}, + "intent": {}, + "parameters": [], + "extracted_parameters": {}, + "missing_parameters": [], + "complete": False, + "current_node": "", + } + + state = State.from_dict(state_dict) + + assert isinstance(state, State) + assert state.thread_id == "thread_123" + assert state.context == {} + assert state.intent == {} + assert state.parameters == [] + assert state.extracted_parameters == {} + assert state.missing_parameters == [] + assert state.complete is False + assert state.current_node == "" + + def test_from_dict_full(self, sample_user_message): + """Test creating State from full dictionary.""" + state_dict = { + "thread_id": "thread_123", + "user_message": sample_user_message.to_dict(), + "bot_message": [{"text": "Response"}], + "nlu": {"tokens": ["hello"]}, + "context": {"user_id": "123"}, + "intent": {"name": "greet", "confidence": 0.9}, + "parameters": [{"name": "location"}], + "extracted_parameters": {"location": "NYC"}, + "missing_parameters": ["date"], + "complete": True, + "current_node": "confirmation", + "date": datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC), + } + + state = State.from_dict(state_dict) + + assert isinstance(state, State) + assert state.thread_id == "thread_123" + assert state.context == {"user_id": "123"} + assert state.intent == {"name": "greet", "confidence": 0.9} + assert state.parameters == [{"name": "location"}] + assert state.extracted_parameters == {"location": "NYC"} + assert state.missing_parameters == ["date"] + assert state.complete is True + assert state.current_node == "confirmation" + + def test_update_incomplete_state(self, sample_user_message): + """Test updating an incomplete state.""" + state = State(thread_id="thread_123", complete=False) + original_date = state.date + + # Wait a bit to ensure date difference + import time + + time.sleep(0.001) + + state.update(sample_user_message) + + assert state.user_message == sample_user_message + assert state.context == sample_user_message.context + assert state.date > original_date # Date should be updated + + def test_update_complete_state(self, sample_user_message): + """Test updating a complete state (should reset completion status).""" + state = State( + thread_id="thread_123", + complete=True, + intent={"name": "old_intent"}, + parameters=[{"name": "old_param"}], + extracted_parameters={"old": "value"}, + missing_parameters=["old_missing"], + current_node="old_node", + bot_message=[{"text": "old message"}], + ) + + state.update(sample_user_message) + + # Complete state should be reset + assert state.complete is False + assert state.intent is None + assert state.parameters == [] + assert state.extracted_parameters == {} + assert state.missing_parameters == [] + assert state.current_node is None + assert state.bot_message == [] + assert state.user_message == sample_user_message + assert state.context == sample_user_message.context + + def test_get_active_intent_id_with_intent(self): + """Test getting active intent ID when intent exists.""" + state = State(thread_id="thread_123") + state.intent = {"id": "intent_123", "name": "test_intent"} + + active_intent_id = state.get_active_intent_id() + + assert active_intent_id == "intent_123" + + def test_get_active_intent_id_without_intent(self): + """Test getting active intent ID when no intent exists.""" + state = State(thread_id="thread_123") + + active_intent_id = state.get_active_intent_id() + + assert active_intent_id is None + + def test_get_active_intent_id_empty_intent(self): + """Test getting active intent ID when intent is empty.""" + state = State(thread_id="thread_123") + state.intent = {} + + active_intent_id = state.get_active_intent_id() + + assert active_intent_id is None + + def test_date_auto_generation(self): + """Test that date is automatically generated if not provided.""" + before_creation = datetime.now(UTC) + state = State(thread_id="thread_123") + after_creation = datetime.now(UTC) + + assert before_creation <= state.date <= after_creation + + def test_nlu_initialization(self): + """Test that NLU is initialized as empty dict.""" + state = State(thread_id="thread_123") + + assert state.nlu == {} + + def test_context_initialization(self): + """Test context initialization.""" + # Test with None + state1 = State(thread_id="thread_123", context=None) + assert state1.context == {} + + # Test with provided context + context = {"key": "value"} + state2 = State(thread_id="thread_123", context=context) + assert state2.context == context + + def test_parameters_initialization(self): + """Test parameters initialization.""" + # Test with None + state1 = State(thread_id="thread_123", parameters=None) + assert state1.parameters == [] + + # Test with provided parameters + parameters = [{"name": "test"}] + state2 = State(thread_id="thread_123", parameters=parameters) + assert state2.parameters == parameters + + def test_extracted_parameters_initialization(self): + """Test extracted_parameters initialization.""" + # Test with None + state1 = State(thread_id="thread_123", extracted_parameters=None) + assert state1.extracted_parameters == {} + + # Test with provided extracted_parameters + extracted = {"key": "value"} + state2 = State(thread_id="thread_123", extracted_parameters=extracted) + assert state2.extracted_parameters == extracted + + def test_missing_parameters_initialization(self): + """Test missing_parameters initialization.""" + # Test with None + state1 = State(thread_id="thread_123", missing_parameters=None) + assert state1.missing_parameters == [] + + # Test with provided missing_parameters + missing = ["param1", "param2"] + state2 = State(thread_id="thread_123", missing_parameters=missing) + assert state2.missing_parameters == missing diff --git a/tests/unit/nlu/conftest.py b/tests/unit/nlu/conftest.py new file mode 100644 index 00000000..e9f40716 --- /dev/null +++ b/tests/unit/nlu/conftest.py @@ -0,0 +1,42 @@ +import pytest +from unittest.mock import Mock, MagicMock, patch +import spacy + + +@pytest.fixture +def mock_spacy_load(): + with patch("spacy.load") as mock_load: + mock_nlp = Mock() + mock_doc = Mock() + mock_doc.vector = [0.1, 0.2, 0.3] + mock_nlp.return_value = mock_doc + mock_load.return_value = mock_nlp + yield mock_load + + +@pytest.fixture +def sample_training_data(): + return [ + {"text": "Hello world", "intent": "greet", "spacy_doc": None, "entities": []}, + {"text": "Order pizza", "intent": "order_pizza", "spacy_doc": None, "entities": []}, + ] + + +@pytest.fixture +def mock_cloudpickle_dump_load(): + with patch("cloudpickle.dump") as mock_dump, patch("cloudpickle.load") as mock_load: + yield mock_dump, mock_load + + +@pytest.fixture +def mock_pycrfsuite_tagger(): + with patch("pycrfsuite.Tagger") as mock_tagger: + yield mock_tagger + + +@pytest.fixture +def mock_openai_chain(): + with patch("app.bot.nlu.llm.zero_shot_nlu_openai.ChatOpenAI") as mock_llm, \ + patch("app.bot.nlu.llm.zero_shot_nlu_openai.ChatPromptTemplate.from_messages") as mock_prompt, \ + patch("app.bot.nlu.llm.zero_shot_nlu_openai.JsonOutputParser") as mock_parser: + yield mock_llm, mock_prompt, mock_parser diff --git a/tests/unit/nlu/test_crf_entity_extractor.py b/tests/unit/nlu/test_crf_entity_extractor.py new file mode 100644 index 00000000..3d757e50 --- /dev/null +++ b/tests/unit/nlu/test_crf_entity_extractor.py @@ -0,0 +1,144 @@ + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +from unittest.mock import Mock, patch, MagicMock +from app.bot.nlu.entity_extractors.crf_entity_extractor import CRFEntityExtractor + +class TestCRFEntityExtractor: + class Token: + def __init__(self, i, text, pos_, tag_): + self.i = i + self.text = text + self.pos_ = pos_ + self.tag_ = tag_ + self.vector = [0.1, 0.2, 0.3] # Add vector attribute for spacy compatibility + + def test_init(self): + """Test CRFEntityExtractor initialization.""" + extractor = CRFEntityExtractor() + assert extractor.tagger is None + + @patch("pycrfsuite.Trainer") + def test_train(self, mock_trainer, mock_spacy_load, sample_training_data): + """Test training the CRF extractor.""" + # Setup mocks + mock_train_instance = Mock() + mock_trainer.return_value = mock_train_instance + + extractor = CRFEntityExtractor() + + mock_doc = MagicMock() + mock_doc.__iter__.return_value = iter([ + self.Token(0, "Hello", "NOUN", "UH"), + self.Token(1, "world", "NN", "NN") + ]) + mock_doc.char_span.return_value = [self.Token(0, "Hello", "NOUN", "UH")] + + for example in sample_training_data: + example["spacy_doc"] = mock_doc + + # Ensure /tmp/model exists + if not os.path.exists("/tmp/model"): + os.makedirs("/tmp/model") + + extractor.train(sample_training_data, "/tmp/model") + mock_train_instance.train.assert_called_once() + + def test_load_success(self, mock_pycrfsuite_tagger): + """Test successful model loading.""" + mock_tagger_instance = Mock() + mock_pycrfsuite_tagger.return_value = mock_tagger_instance + + extractor = CRFEntityExtractor() + + with patch("os.path.join"), \ + patch("builtins.open", new_callable=MagicMock): + mock_doc = MagicMock() + mock_doc.__iter__.return_value = iter([ + TestCRFEntityExtractor.Token(0, "Hello", "NOUN", "UH"), + TestCRFEntityExtractor.Token(1, "world", "NN", "NN") + ]) + mock_doc.char_span.return_value = [TestCRFEntityExtractor.Token(0, "Hello", "NOUN", "UH")] + + def test_load_failure(self): + """Test model loading failure.""" + extractor = CRFEntityExtractor() + + mock_tagger = Mock() + mock_tagger.open.side_effect = IOError("File not found") + + with patch("os.path.join"), \ + patch("pycrfsuite.Tagger", return_value=mock_tagger): + result = extractor.load("/tmp/model") + + assert result is False + + def test_extract_features(self, mock_spacy_load): + """Test feature extraction for CRF.""" + extractor = CRFEntityExtractor() + + sent = [("Hello", "UH"), ("world", "NN")] + features = extractor.extract_features(sent, 0) + + assert "bias" in features + assert "word.lower=hello" in features + assert "postag=UH" in features + + def test_sent_to_features(self, mock_spacy_load): + """Test converting sentence to features.""" + extractor = CRFEntityExtractor() + + sent = [("Hello", "UH"), ("world", "NN")] + features = extractor.sent_to_features(sent) + + assert len(features) == 2 + assert isinstance(features[0], list) + + def test_sent_to_labels(self, mock_spacy_load): + """Test extracting labels from sentence.""" + extractor = CRFEntityExtractor() + + sent = [("Hello", "UH", "O"), ("world", "NN", "O")] + labels = extractor.sent_to_labels(sent) + + assert labels == ["O", "O"] + + def test_process_with_text_and_spacy_doc(self, mock_spacy_load, mock_pycrfsuite_tagger): + """Test processing message with text and spacy_doc.""" + mock_tagger_instance = Mock() + mock_tagger_instance.tag.return_value = ["B-PERSON", "O"] + mock_pycrfsuite_tagger.return_value = mock_tagger_instance + + extractor = CRFEntityExtractor() + extractor.tagger = mock_tagger_instance + + mock_doc = MagicMock() + mock_doc.__iter__.return_value = iter([ + self.Token(0, "John", "PROPN", "NNP"), + self.Token(1, "said", "VERB", "VB") + ]) + mock_doc.char_span.return_value = [self.Token(0, "John", "PROPN", "NNP")] + + message = {"text": "John said", "spacy_doc": mock_doc} + + result = extractor.process(message) + assert "entities" in result + + def test_process_without_text(self): + """Test processing without text.""" + extractor = CRFEntityExtractor() + message = {"other": "data"} + + result = extractor.process(message) + assert result == message + + def test_process_without_spacy_doc(self): + """Test processing without spacy_doc.""" + extractor = CRFEntityExtractor() + message = {"text": "Hello world"} + + result = extractor.process(message) + assert result == message diff --git a/tests/unit/nlu/test_nlu_pipeline.py b/tests/unit/nlu/test_nlu_pipeline.py new file mode 100644 index 00000000..7920ac8a --- /dev/null +++ b/tests/unit/nlu/test_nlu_pipeline.py @@ -0,0 +1,122 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +from unittest.mock import Mock, patch +from app.bot.nlu.pipeline import NLUPipeline, NLUComponent + + +class TestNLUPipeline: + def test_init_without_components(self): + """Test NLUPipeline initialization without components.""" + pipeline = NLUPipeline() + assert pipeline.components == [] + + def test_init_with_components(self): + """Test NLUPipeline initialization with components.""" + component1 = Mock(spec=NLUComponent) + component2 = Mock(spec=NLUComponent) + + pipeline = NLUPipeline([component1, component2]) + assert len(pipeline.components) == 2 + assert pipeline.components[0] == component1 + assert pipeline.components[1] == component2 + + def test_add_component(self): + """Test adding a component to the pipeline.""" + pipeline = NLUPipeline() + component = Mock(spec=NLUComponent) + + pipeline.add_component(component) + assert len(pipeline.components) == 1 + assert pipeline.components[0] == component + + @patch("os.path.exists") + @patch("os.makedirs") + def test_train(self, mock_makedirs, mock_exists): + """Test training all components in the pipeline.""" + mock_exists.return_value = False + + component1 = Mock(spec=NLUComponent) + component2 = Mock(spec=NLUComponent) + + pipeline = NLUPipeline([component1, component2]) + training_data = [{"text": "Hello", "intent": "greet"}] + + pipeline.train(training_data, "/tmp/model") + + component1.train.assert_called_once_with(training_data, "/tmp/model") + component2.train.assert_called_once_with(training_data, "/tmp/model") + mock_makedirs.assert_called_once_with("/tmp/model") + + def test_load_success(self): + """Test successful loading of all components.""" + component1 = Mock(spec=NLUComponent) + component1.load.return_value = True + component2 = Mock(spec=NLUComponent) + component2.load.return_value = True + + pipeline = NLUPipeline([component1, component2]) + + result = pipeline.load("/tmp/model") + assert result is True + + component1.load.assert_called_once_with("/tmp/model") + component2.load.assert_called_once_with("/tmp/model") + + def test_load_failure(self): + """Test loading failure when one component fails.""" + component1 = Mock(spec=NLUComponent) + component1.load.return_value = True + component2 = Mock(spec=NLUComponent) + component2.load.return_value = False + + pipeline = NLUPipeline([component1, component2]) + + result = pipeline.load("/tmp/model") + assert result is False + + component1.load.assert_called_once_with("/tmp/model") + component2.load.assert_called_once_with("/tmp/model") + + def test_process(self): + """Test processing message through all components.""" + component1 = Mock(spec=NLUComponent) + component1.process.return_value = {"text": "Hello", "processed1": True} + + component2 = Mock(spec=NLUComponent) + component2.process.return_value = {"text": "Hello", "processed1": True, "processed2": True} + + pipeline = NLUPipeline([component1, component2]) + message = {"text": "Hello"} + + result = pipeline.process(message) + + assert result["processed1"] is True + assert result["processed2"] is True + + # Verify components are called in order + component1.process.assert_called_once_with(message) + component2.process.assert_called_once_with({"text": "Hello", "processed1": True}) + + def test_process_empty_pipeline(self): + """Test processing with empty pipeline.""" + pipeline = NLUPipeline() + message = {"text": "Hello"} + + result = pipeline.process(message) + assert result == message + + def test_process_single_component(self): + """Test processing with single component.""" + component = Mock(spec=NLUComponent) + component.process.return_value = {"text": "Hello", "processed": True} + + pipeline = NLUPipeline([component]) + message = {"text": "Hello"} + + result = pipeline.process(message) + + assert result["processed"] is True + component.process.assert_called_once_with(message) diff --git a/tests/unit/nlu/test_sklearn_intent_classifier.py b/tests/unit/nlu/test_sklearn_intent_classifier.py new file mode 100644 index 00000000..44ee44ac --- /dev/null +++ b/tests/unit/nlu/test_sklearn_intent_classifier.py @@ -0,0 +1,152 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +import numpy as np +from unittest.mock import Mock, patch, MagicMock +from app.bot.nlu.intent_classifiers.sklearn_intent_classifer import SklearnIntentClassifier + + +class TestSklearnIntentClassifier: + def test_init(self): + """Test SklearnIntentClassifier initialization.""" + classifier = SklearnIntentClassifier() + assert classifier.model is None + + @patch("sklearn.model_selection.GridSearchCV") + @patch("sklearn.svm.SVC") + def test_train(self, mock_svc, mock_grid_search, mock_spacy_load, sample_training_data): + """Test training the classifier.""" + # Setup mocks + mock_classifier = Mock() + mock_classifier.best_estimator_ = Mock() + mock_grid_search.return_value = mock_classifier + + classifier = SklearnIntentClassifier() + + # Mock spacy doc with vector + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + for example in sample_training_data: + example["spacy_doc"] = mock_doc + + with patch("os.path.exists", return_value=True), \ + patch("os.makedirs"): + classifier.train(sample_training_data, "/tmp/model") + + assert classifier.model is not None + mock_grid_search.assert_called_once() + + def test_train_with_empty_text(self, mock_spacy_load): + """Test training with empty text.""" + classifier = SklearnIntentClassifier() + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + training_data = [{"text": "", "intent": "greet", "spacy_doc": mock_doc}] + + with patch("sklearn.model_selection.GridSearchCV") as mock_grid: + mock_grid.return_value = Mock() + with patch("numpy.stack", side_effect=ValueError("need at least one array to stack")): + try: + classifier.train(training_data, "/tmp/model") + except ValueError: + pass # Expected + + # Should not process empty text + assert classifier.model is None + + @patch("cloudpickle.load") + def test_load_success(self, mock_load, mock_cloudpickle_dump_load): + """Test successful model loading.""" + mock_load.return_value = Mock() + classifier = SklearnIntentClassifier() + + result = classifier.load("/tmp/model") + assert result is True + assert classifier.model is not None + + @patch("os.path.join") + @patch("builtins.open", new_callable=MagicMock) + def test_load_failure(self, mock_open, mock_join): + """Test model loading failure.""" + mock_join.return_value = "/tmp/model/sklearn_intent_model.hd5" + mock_open.side_effect = IOError("File not found") + + classifier = SklearnIntentClassifier() + result = classifier.load("/tmp/model") + + assert result is False + assert classifier.model is None + + def test_get_spacy_embedding(self, mock_spacy_load): + """Test spacy embedding extraction.""" + classifier = SklearnIntentClassifier() + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + + result = classifier.get_spacy_embedding(mock_doc) + np.testing.assert_array_equal(result, np.array([0.1, 0.2, 0.3])) + + def test_predict_proba(self, mock_spacy_load): + """Test probability prediction.""" + classifier = SklearnIntentClassifier() + classifier.model = Mock() + classifier.model.classes_ = ["greet", "goodbye"] + classifier.model.predict_proba.return_value = np.array([[0.8, 0.2]]) + + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + message = {"spacy_doc": mock_doc} + + intents, probabilities = classifier.predict_proba(message) + + assert len(intents) == 1 + assert len(probabilities) == 1 + + def test_process_with_model(self, mock_spacy_load): + """Test processing with trained model.""" + classifier = SklearnIntentClassifier() + classifier.model = Mock() + classifier.model.classes_ = ["greet", "goodbye"] + classifier.model.predict_proba.return_value = np.array([[0.8, 0.2]]) + + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + message = {"text": "Hello", "spacy_doc": mock_doc} + + result = classifier.process(message) + + assert "intent" in result + assert "intent_ranking" in result + assert result["intent"]["intent"] == "greet" + assert result["intent"]["confidence"] == 0.8 + + def test_process_without_text(self): + """Test processing without text.""" + classifier = SklearnIntentClassifier() + message = {"other": "data"} + + result = classifier.process(message) + assert result == message + + def test_process_without_spacy_doc(self): + """Test processing without spacy_doc.""" + classifier = SklearnIntentClassifier() + message = {"text": "Hello"} + + result = classifier.process(message) + assert result == message + + def test_process_without_model(self, mock_spacy_load): + """Test processing without trained model.""" + classifier = SklearnIntentClassifier() + mock_doc = Mock() + mock_doc.vector = np.array([0.1, 0.2, 0.3]) + message = {"text": "Hello", "spacy_doc": mock_doc} + + result = classifier.process(message) + + assert result["intent"]["name"] is None + assert result["intent"]["confidence"] == 0.0 + assert result["intent_ranking"] == [] diff --git a/tests/unit/nlu/test_spacy_featurizer.py b/tests/unit/nlu/test_spacy_featurizer.py new file mode 100644 index 00000000..f4f80040 --- /dev/null +++ b/tests/unit/nlu/test_spacy_featurizer.py @@ -0,0 +1,75 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +from unittest.mock import Mock, patch +from app.bot.nlu.featurizers.spacy_featurizer import SpacyFeaturizer + + +class TestSpacyFeaturizer: + def test_init(self, mock_spacy_load): + """Test SpacyFeaturizer initialization.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + assert featurizer.tokenizer is not None + mock_spacy_load.assert_called_once_with("en_core_web_sm") + + def test_train_with_valid_data(self, mock_spacy_load, sample_training_data): + """Test training with valid data.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + mock_doc = Mock() + mock_doc.vector = [0.1, 0.2, 0.3] + featurizer.tokenizer.return_value = mock_doc + + featurizer.train(sample_training_data, "/tmp/model") + + # Check that spacy_doc was added to training data + assert sample_training_data[0]["spacy_doc"] is not None + assert sample_training_data[1]["spacy_doc"] is not None + + def test_train_with_empty_text(self, mock_spacy_load): + """Test training with empty text.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + training_data = [{"text": "", "intent": "greet"}] + + featurizer.train(training_data, "/tmp/model") + + # Should not add spacy_doc for empty text + assert "spacy_doc" not in training_data[0] + + def test_load(self, mock_spacy_load): + """Test load method.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + result = featurizer.load("/tmp/model") + assert result is True + + def test_process_with_text(self, mock_spacy_load): + """Test processing message with text.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + mock_doc = Mock() + mock_doc.vector = [0.1, 0.2, 0.3] + featurizer.tokenizer.return_value = mock_doc + + message = {"text": "Hello world"} + result = featurizer.process(message) + + assert result["spacy_doc"] is not None + assert result["text"] == "Hello world" + + def test_process_without_text(self, mock_spacy_load): + """Test processing message without text.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + message = {"other": "data"} + result = featurizer.process(message) + + assert result == message + assert "spacy_doc" not in result + + def test_process_with_empty_text(self, mock_spacy_load): + """Test processing message with empty text.""" + featurizer = SpacyFeaturizer("en_core_web_sm") + message = {"text": ""} + result = featurizer.process(message) + + assert result == message + assert "spacy_doc" not in result diff --git a/tests/unit/nlu/test_synonym_replacer.py b/tests/unit/nlu/test_synonym_replacer.py new file mode 100644 index 00000000..c546fbbb --- /dev/null +++ b/tests/unit/nlu/test_synonym_replacer.py @@ -0,0 +1,101 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +from app.bot.nlu.entity_extractors.synonym_replacer import SynonymReplacer + + +class TestSynonymReplacer: + def test_init_with_synonyms(self): + """Test SynonymReplacer initialization with synonyms.""" + synonyms = {"hi": "hello", "bye": "goodbye"} + replacer = SynonymReplacer(synonyms) + assert replacer.synonyms == synonyms + + def test_init_without_synonyms(self): + """Test SynonymReplacer initialization without synonyms.""" + replacer = SynonymReplacer() + assert replacer.synonyms == {} + + def test_replace_synonyms_with_matches(self): + """Test synonym replacement with matching entities.""" + synonyms = {"ny": "new york", "la": "los angeles"} + replacer = SynonymReplacer(synonyms) + + entities = {"location": "ny", "destination": "la"} + result = replacer.replace_synonyms(entities) + + assert result["location"] == "new york" + assert result["destination"] == "los angeles" + + def test_replace_synonyms_without_matches(self): + """Test synonym replacement without matching entities.""" + synonyms = {"ny": "new york", "la": "los angeles"} + replacer = SynonymReplacer(synonyms) + + entities = {"location": "boston", "destination": "chicago"} + result = replacer.replace_synonyms(entities) + + assert result["location"] == "boston" + assert result["destination"] == "chicago" + + def test_replace_synonyms_mixed(self): + """Test synonym replacement with mixed matching and non-matching.""" + synonyms = {"ny": "new york"} + replacer = SynonymReplacer(synonyms) + + entities = {"location": "ny", "destination": "boston"} + result = replacer.replace_synonyms(entities) + + assert result["location"] == "new york" + assert result["destination"] == "boston" + + def test_replace_synonyms_case_insensitive(self): + """Test synonym replacement is case insensitive.""" + synonyms = {"ny": "new york"} + replacer = SynonymReplacer(synonyms) + + entities = {"location": "ny"} + result = replacer.replace_synonyms(entities) + + assert result["location"] == "new york" + + def test_train(self): + """Test train method (should do nothing).""" + replacer = SynonymReplacer() + # Should not raise any exception + replacer.train({}, "/tmp/model") + + def test_load(self): + """Test load method.""" + replacer = SynonymReplacer() + result = replacer.load("/tmp/model") + assert result is True + + def test_process_with_entities(self): + """Test processing message with entities.""" + synonyms = {"ny": "new york"} + replacer = SynonymReplacer(synonyms) + + message = {"text": "I live in ny", "entities": {"location": "ny"}} + result = replacer.process(message) + + assert result["entities"]["location"] == "new york" + assert result["text"] == "I live in ny" + + def test_process_without_entities(self): + """Test processing message without entities.""" + replacer = SynonymReplacer() + message = {"text": "Hello world"} + + result = replacer.process(message) + assert result == message + + def test_process_with_empty_entities(self): + """Test processing message with empty entities.""" + replacer = SynonymReplacer() + message = {"text": "Hello world", "entities": {}} + + result = replacer.process(message) + assert result == message diff --git a/tests/unit/nlu/test_tf_intent_classifier.py b/tests/unit/nlu/test_tf_intent_classifier.py new file mode 100644 index 00000000..762334b3 --- /dev/null +++ b/tests/unit/nlu/test_tf_intent_classifier.py @@ -0,0 +1,162 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +import numpy as np +from unittest.mock import Mock, patch, MagicMock +from app.bot.nlu.intent_classifiers.tf_intent_classifer import TfIntentClassifier + + +class TestTfIntentClassifier: + @patch("tensorflow.keras.models.load_model") + @patch("tensorflow.keras.backend.clear_session") + @patch("spacy.load") + def test_init(self, mock_spacy_load, mock_clear_session, mock_load_model): + """Test TfIntentClassifier initialization.""" + mock_nlp = Mock() + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + assert classifier.model is None + assert classifier.nlp is not None + assert classifier.label_encoder is not None + + @patch("app.bot.nlu.intent_classifiers.tf_intent_classifer.Sequential") + @patch("app.bot.nlu.intent_classifiers.tf_intent_classifer.Dense") + @patch("app.bot.nlu.intent_classifiers.tf_intent_classifer.Dropout") + @patch("tensorflow.keras.backend.clear_session") + @patch("spacy.load") + def test_train(self, mock_spacy_load, mock_clear_session, mock_dropout, mock_dense, mock_sequential, sample_training_data): + """Test training the classifier.""" + # Setup mocks + mock_nlp = Mock() + mock_doc = Mock() + mock_doc.vector = np.array([0.1] * 384) # Mock embedding vector of size 384 + mock_nlp.return_value = mock_doc + mock_spacy_load.return_value = mock_nlp + + mock_model = Mock() + mock_model.fit = Mock(return_value=None) + mock_sequential.return_value = mock_model + + classifier = TfIntentClassifier() + + # One-hot encode intent labels for training data + intents = [example["intent"] for example in sample_training_data] + unique_intents = sorted(set(intents)) + intent_to_index = {intent: idx for idx, intent in enumerate(unique_intents)} + y_train = np.eye(len(unique_intents))[np.array([intent_to_index[i] for i in intents])] + + # Patch classifier.label_encoder to return the correct one-hot encoded labels + classifier.label_encoder = Mock() + classifier.label_encoder.transform.side_effect = lambda labels: np.eye(len(unique_intents))[np.array([intent_to_index[l] for l in labels])] + classifier.label_encoder.classes_ = np.array(unique_intents) + + with patch("os.path.exists", return_value=True), \ + patch("os.makedirs"), \ + patch("tensorflow.keras.models.save_model"): + classifier.train(sample_training_data, "/tmp/model") + + assert classifier.model is not None + classifier.model.fit.assert_called_once() + + + + @patch("spacy.load") + def test_load_failure(self, mock_spacy_load): + """Test model loading failure.""" + mock_nlp = Mock() + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + + with patch("os.path.join"), \ + patch("builtins.open", side_effect=IOError("File not found")), \ + patch("tensorflow.compat.v1.get_default_graph") as mock_get_graph: + mock_get_graph.return_value = Mock() + result = classifier.load("/tmp/model") + + assert result is False + + @patch("tensorflow.keras.backend.clear_session") + @patch("spacy.load") + def test_predict_proba(self, mock_spacy_load, mock_clear_session): + """Test probability prediction.""" + mock_nlp = Mock() + mock_doc = Mock() + mock_doc.vector = np.array([0.1] * 384) # Spacy vector size + mock_nlp.return_value = mock_doc + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + classifier.model = Mock() + classifier.model.predict.return_value = np.array([[0.8, 0.2]]) + classifier.label_encoder = Mock() + classifier.label_encoder.classes_ = ["greet", "goodbye"] + classifier.graph = Mock() + classifier.graph.as_default.return_value.__enter__ = Mock() + classifier.graph.as_default.return_value.__exit__ = Mock() + + message = {"text": "Hello"} + + intents, probabilities = classifier.predict_proba(message) + + assert len(intents) == 1 + assert len(probabilities) == 1 + + @patch("tensorflow.keras.backend.clear_session") + @patch("spacy.load") + def test_process_with_model(self, mock_spacy_load, mock_clear_session): + """Test processing with trained model.""" + mock_nlp = Mock() + mock_doc = Mock() + mock_doc.vector = np.array([0.1] * 384) # Spacy vector size + mock_nlp.return_value = mock_doc + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + classifier.model = Mock() + classifier.model.predict.return_value = np.array([[0.8, 0.2]]) + classifier.label_encoder = Mock() + classifier.label_encoder.classes_ = ["greet", "goodbye"] + classifier.graph = Mock() + classifier.graph.as_default.return_value.__enter__ = Mock() + classifier.graph.as_default.return_value.__exit__ = Mock() + + message = {"text": "Hello"} + + result = classifier.process(message) + + assert "intent" in result + assert "intent_ranking" in result + assert result["intent"]["intent"] == "greet" + assert result["intent"]["confidence"] == 0.8 + + @patch("spacy.load") + def test_process_without_text(self, mock_spacy_load): + """Test processing without text.""" + mock_nlp = Mock() + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + message = {"other": "data"} + + result = classifier.process(message) + assert result == message + + @patch("tensorflow.keras.backend.clear_session") + @patch("spacy.load") + def test_process_without_model(self, mock_spacy_load, mock_clear_session): + """Test processing without trained model.""" + mock_nlp = Mock() + mock_spacy_load.return_value = mock_nlp + + classifier = TfIntentClassifier() + message = {"text": "Hello"} + + result = classifier.process(message) + + assert result["intent"]["name"] is None + assert result["intent"]["confidence"] == 0.0 + assert result["intent_ranking"] == [] diff --git a/tests/unit/nlu/test_zero_shot_nlu_openai.py b/tests/unit/nlu/test_zero_shot_nlu_openai.py new file mode 100644 index 00000000..889094ab --- /dev/null +++ b/tests/unit/nlu/test_zero_shot_nlu_openai.py @@ -0,0 +1,149 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../')) + +import pytest +from unittest.mock import Mock, patch, MagicMock +from app.bot.nlu.llm.zero_shot_nlu_openai import ZeroShotNLUOpenAI + + +class TestZeroShotNLUOpenAI: + @patch("app.bot.nlu.llm.zero_shot_nlu_openai.Environment") + @patch("app.bot.nlu.llm.zero_shot_nlu_openai.FileSystemLoader") + def test_init(self, mock_loader, mock_env, mock_openai_chain): + """Test ZeroShotNLUOpenAI initialization.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + + # Mock environment and template + mock_environment = Mock() + mock_env.return_value = mock_environment + mock_template = Mock() + mock_template.render.return_value = "system prompt" + mock_environment.get_template.return_value = mock_template + + # Mock chain + mock_chain = Mock() + mock_prompt.return_value = mock_chain + + intents = ["greet", "goodbye"] + entities = ["person", "location"] + + nlu = ZeroShotNLUOpenAI(intents=intents, entities=entities) + + assert nlu.intents == intents + assert nlu.entities == entities + assert nlu.llm is not None + assert nlu.chain is not None + + def test_train(self, mock_openai_chain): + """Test train method (should do nothing).""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + nlu = ZeroShotNLUOpenAI() + # Should not raise any exception + nlu.train([], "/tmp/model") + + def test_load(self, mock_openai_chain): + """Test load method.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + nlu = ZeroShotNLUOpenAI() + result = nlu.load("/tmp/model") + assert result is True + + def test_process_success(self, mock_openai_chain): + """Test successful message processing.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + + # Mock the chain directly + mock_chain = Mock() + mock_chain.invoke = Mock(return_value={ + "intent": "greet", + "entities": {"person": "John", "location": None} + }) + mock_prompt.return_value = mock_chain + + nlu = ZeroShotNLUOpenAI(intents=["greet"], entities=["person", "location"]) + # Mock the chain after initialization + nlu.chain = mock_chain + + message = {"text": "Hello John"} + + result = nlu.process(message) + + assert "intent" in result + assert result["intent"]["intent"] == "greet" + assert result["intent"]["confidence"] == 1.0 + assert "intent_ranking" in result + assert "entities" in result + assert result["entities"]["person"] == "John" + assert "location" not in result["entities"] # None values should be filtered out + + @patch("app.bot.nlu.llm.zero_shot_nlu_openai.Environment") + @patch("app.bot.nlu.llm.zero_shot_nlu_openai.FileSystemLoader") + def test_process_without_text(self, mock_loader, mock_env, mock_openai_chain): + """Test processing message without text.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + + # Setup mocks + mock_environment = Mock() + mock_env.return_value = mock_environment + mock_template = Mock() + mock_template.render.return_value = "system prompt" + mock_environment.get_template.return_value = mock_template + + mock_chain = Mock() + mock_prompt.return_value = mock_chain + + nlu = ZeroShotNLUOpenAI() + message = {"other": "data"} + + result = nlu.process(message) + + # Should log warning and return original message + assert result == message + assert "intent" not in result + + def test_process_with_exception(self, mock_openai_chain): + """Test processing with exception handling.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + + # Mock the chain directly + mock_chain = Mock() + mock_chain.invoke = Mock(side_effect=Exception("API Error")) + mock_prompt.return_value = mock_chain + + nlu = ZeroShotNLUOpenAI() + # Mock the chain after initialization + nlu.chain = mock_chain + + message = {"text": "Hello world"} + + result = nlu.process(message) + + assert result["intent"]["intent"] is None + assert result["intent"]["confidence"] == 0.0 + assert result["intent_ranking"] == [] + assert result["entities"] == {} + + def test_process_with_no_intent(self, mock_openai_chain): + """Test processing when no intent is detected.""" + mock_llm, mock_prompt, mock_parser = mock_openai_chain + + # Mock the chain directly + mock_chain = Mock() + mock_chain.invoke = Mock(return_value={ + "intent": None, + "entities": {} + }) + mock_prompt.return_value = mock_chain + + nlu = ZeroShotNLUOpenAI() + # Mock the chain after initialization + nlu.chain = mock_chain + + message = {"text": "Random text"} + + result = nlu.process(message) + + assert result["intent"]["intent"] is None + assert result["intent"]["confidence"] == 0.0 + assert "intent_ranking" not in result # Since intent is None, intent_ranking is not set