|
| 1 | +import pytest |
| 2 | +import email |
| 3 | +from email.mime.text import MIMEText |
| 4 | +from email.mime.multipart import MIMEMultipart |
| 5 | +from unittest.mock import AsyncMock, MagicMock, patch |
| 6 | +from datetime import datetime, timedelta |
| 7 | + |
| 8 | +from src.email_processor import EmailProcessor |
| 9 | +from src.models.email import Email, EmailStatus, EmailType |
| 10 | +from src.schemas.email import EmailCreate, EmailAttachmentCreate |
| 11 | +from src.core.config import settings |
| 12 | + |
| 13 | +# Test data |
| 14 | + |
| 15 | +TEST_SUBJECT = "Test Subject" |
| 16 | +TEST_BODY = "This is a test email body" |
| 17 | +TEST_MESSAGE_ID = "<[email protected]>" |
| 18 | + |
| 19 | +# Fixtures |
| 20 | +@pytest.fixture |
| 21 | +def email_processor(): |
| 22 | + return EmailProcessor() |
| 23 | + |
| 24 | +@pytest.fixture |
| 25 | +def mock_db_session(): |
| 26 | + session = AsyncMock() |
| 27 | + session.commit = AsyncMock() |
| 28 | + session.refresh = AsyncMock() |
| 29 | + return session |
| 30 | + |
| 31 | +@pytest.fixture |
| 32 | +def simple_email_message(): |
| 33 | + msg = MIMEText(TEST_BODY) |
| 34 | + msg["From"] = f"Test User <{TEST_EMAIL}>" |
| 35 | + |
| 36 | + msg["Subject"] = TEST_SUBJECT |
| 37 | + msg["Message-ID"] = TEST_MESSAGE_ID |
| 38 | + return msg |
| 39 | + |
| 40 | +@pytest.fixture |
| 41 | +def multipart_email_message(): |
| 42 | + msg = MIMEMultipart() |
| 43 | + msg["From"] = f"Test User <{TEST_EMAIL}>" |
| 44 | + |
| 45 | + msg["Subject"] = TEST_SUBJECT |
| 46 | + msg.attach(MIMEText("This is the plain text version", "plain")) |
| 47 | + msg.attach(MIMEText("<p>This is the HTML version</p>", "html")) |
| 48 | + return msg |
| 49 | + |
| 50 | +@pytest.fixture |
| 51 | +def email_with_attachment(): |
| 52 | + msg = MIMEMultipart() |
| 53 | + msg["From"] = f"Test User <{TEST_EMAIL}>" |
| 54 | + |
| 55 | + msg["Subject"] = TEST_SUBJECT |
| 56 | + |
| 57 | + # Add text part |
| 58 | + msg.attach(MIMEText("Email with attachment", "plain")) |
| 59 | + |
| 60 | + # Add attachment |
| 61 | + attachment = MIMEText("This is a test attachment") |
| 62 | + attachment.add_header( |
| 63 | + "Content-Disposition", "attachment", filename="test.txt" |
| 64 | + ) |
| 65 | + msg.attach(attachment) |
| 66 | + |
| 67 | + return msg |
| 68 | + |
| 69 | +# Tests |
| 70 | +class TestEmailProcessor: |
| 71 | + @pytest.mark.asyncio |
| 72 | + async def test_process_simple_email(self, email_processor, mock_db_session, simple_email_message): |
| 73 | + # Test processing a simple text email |
| 74 | + result = await email_processor.process_email( |
| 75 | + mock_db_session, simple_email_message, TEST_EMAIL, [ "[email protected]"] |
| 76 | + ) |
| 77 | + |
| 78 | + assert result is not None |
| 79 | + assert result.sender == TEST_EMAIL |
| 80 | + assert result.subject == TEST_SUBJECT |
| 81 | + assert result.body == TEST_BODY |
| 82 | + assert result.status == EmailStatus.PROCESSED |
| 83 | + assert result.email_type == EmailType.STANDARD |
| 84 | + |
| 85 | + @pytest.mark.asyncio |
| 86 | + async def test_process_multipart_email(self, email_processor, mock_db_session, multipart_email_message): |
| 87 | + # Test processing a multipart email |
| 88 | + result = await email_processor.process_email( |
| 89 | + mock_db_session, multipart_email_message, TEST_EMAIL, [ "[email protected]"] |
| 90 | + ) |
| 91 | + |
| 92 | + assert result is not None |
| 93 | + assert "This is the plain text version" in result.body |
| 94 | + assert "<p>This is the HTML version</p>" not in result.body # Should be converted to text |
| 95 | + |
| 96 | + @pytest.mark.asyncio |
| 97 | + async def test_process_email_with_attachment(self, email_processor, mock_db_session, email_with_attachment): |
| 98 | + # Test processing an email with an attachment |
| 99 | + result = await email_processor.process_email( |
| 100 | + mock_db_session, email_with_attachment, TEST_EMAIL, [ "[email protected]"] |
| 101 | + ) |
| 102 | + |
| 103 | + assert result is not None |
| 104 | + assert "Email with attachment" in result.body |
| 105 | + |
| 106 | + # Verify attachment was processed (in this case, just logged) |
| 107 | + # In a real test, you'd mock the storage and verify it was called |
| 108 | + |
| 109 | + @pytest.mark.asyncio |
| 110 | + async def test_determine_email_type_auto_reply(self, email_processor): |
| 111 | + # Test auto-reply detection |
| 112 | + assert email_processor._determine_email_type( |
| 113 | + "Auto: Out of Office", "I'm out of the office", TEST_EMAIL, [ "[email protected]"] |
| 114 | + ) == EmailType.AUTO_REPLY |
| 115 | + |
| 116 | + assert email_processor._determine_email_type( |
| 117 | + "RE: Test", "This is an automatic reply", TEST_EMAIL, [ "[email protected]"] |
| 118 | + ) == EmailType.AUTO_REPLY |
| 119 | + |
| 120 | + @pytest.mark.asyncio |
| 121 | + async def test_determine_email_type_bounce(self, email_processor): |
| 122 | + # Test bounce detection |
| 123 | + assert email_processor._determine_email_type( |
| 124 | + "Delivery Status Notification (Failure)", "Could not deliver message", "[email protected]", [ "[email protected]"] |
| 125 | + ) == EmailType.BOUNCE |
| 126 | + |
| 127 | + @pytest.mark.asyncio |
| 128 | + async def test_determine_email_type_unsubscribe(self, email_processor): |
| 129 | + # Test unsubscribe detection |
| 130 | + assert email_processor._determine_email_type( |
| 131 | + "Unsubscribe", "Please remove me from your mailing list", TEST_EMAIL, [ "[email protected]"] |
| 132 | + ) == EmailType.UNSUBSCRIBE |
| 133 | + |
| 134 | + assert email_processor._determine_email_type( |
| 135 | + "Opt-out request", "I want to opt out", TEST_EMAIL, [ "[email protected]"] |
| 136 | + ) == EmailType.UNSUBSCRIBE |
| 137 | + |
| 138 | + @pytest.mark.asyncio |
| 139 | + async def test_determine_email_type_infra_request(self, email_processor): |
| 140 | + # Test infrastructure request detection |
| 141 | + assert email_processor._determine_email_type( |
| 142 | + "[Infra Request] Need new server", "Please provision a new server", TEST_EMAIL, [ "[email protected]"] |
| 143 | + ) == EmailType.INFRA_REQUEST |
| 144 | + |
| 145 | + assert email_processor._determine_email_type( |
| 146 | + "Infrastructure request: Database", "Need a new database", TEST_EMAIL, [ "[email protected]"] |
| 147 | + ) == EmailType.INFRA_REQUEST |
| 148 | + |
| 149 | + @pytest.mark.asyncio |
| 150 | + @patch("src.email_processor.EmailProcessor._save_email") |
| 151 | + @patch("src.email_processor.EmailProcessor._process_attachments") |
| 152 | + @patch("src.email_processor.EmailProcessor._route_email") |
| 153 | + async def test_process_email_flow( |
| 154 | + self, mock_route_email, mock_process_attachments, mock_save_email, |
| 155 | + email_processor, mock_db_session, simple_email_message |
| 156 | + ): |
| 157 | + # Test the complete flow of processing an email |
| 158 | + mock_save_email.return_value = MagicMock(id="123") |
| 159 | + |
| 160 | + result = await email_processor.process_email( |
| 161 | + mock_db_session, simple_email_message, TEST_EMAIL, [ "[email protected]"] |
| 162 | + ) |
| 163 | + |
| 164 | + # Verify the flow |
| 165 | + mock_save_email.assert_called_once() |
| 166 | + mock_process_attachments.assert_called_once() |
| 167 | + mock_route_email.assert_called_once() |
| 168 | + assert result is not None |
| 169 | + |
| 170 | + @pytest.mark.asyncio |
| 171 | + @patch("src.email_processor.EmailProcessor._save_email") |
| 172 | + async def test_process_email_error_handling( |
| 173 | + self, mock_save_email, email_processor, mock_db_session, simple_email_message |
| 174 | + ): |
| 175 | + # Test error handling during email processing |
| 176 | + mock_save_email.side_effect = Exception("Database error") |
| 177 | + |
| 178 | + result = await email_processor.process_email( |
| 179 | + mock_db_session, simple_email_message, TEST_EMAIL, [ "[email protected]"] |
| 180 | + ) |
| 181 | + |
| 182 | + assert result is None |
| 183 | + |
| 184 | + @pytest.mark.asyncio |
| 185 | + @patch("src.email_processor.APIClient") |
| 186 | + async def test_handle_infra_request( |
| 187 | + self, mock_api_client, email_processor, mock_db_session |
| 188 | + ): |
| 189 | + # Test handling an infrastructure request |
| 190 | + email = MagicMock() |
| 191 | + email. sender = "[email protected]" |
| 192 | + email.subject = "[Infra] Need a new server" |
| 193 | + email.body = "Please provision a new server with 8GB RAM and 4 vCPUs" |
| 194 | + |
| 195 | + # Mock API response |
| 196 | + mock_client = mock_api_client.return_value |
| 197 | + mock_client.create_infrastructure_request.return_value = { |
| 198 | + "id": "req_123", |
| 199 | + "status": "received" |
| 200 | + } |
| 201 | + |
| 202 | + await email_processor._handle_infra_request(mock_db_session, email, email.body) |
| 203 | + |
| 204 | + # Verify API was called with the right parameters |
| 205 | + mock_client.create_infrastructure_request.assert_called_once() |
| 206 | + call_args = mock_client.create_infrastructure_request.call_args[0][0] |
| 207 | + assert call_data["title"] == "[Infra] Need a new server" |
| 208 | + assert "Please provision a new server" in call_data["description"] |
| 209 | + assert call_data[ "requestor_email"] == "[email protected]" |
| 210 | + |
| 211 | + @pytest.mark.asyncio |
| 212 | + async def test_extract_email_content_simple(self, email_processor, simple_email_message): |
| 213 | + # Test extracting content from a simple email |
| 214 | + body, content_type, attachments = email_processor._extract_email_content(simple_email_message) |
| 215 | + |
| 216 | + assert body == TEST_BODY |
| 217 | + assert content_type == "text/plain" |
| 218 | + assert attachments == [] |
| 219 | + |
| 220 | + @pytest.mark.asyncio |
| 221 | + async def test_extract_email_content_multipart(self, email_processor, multipart_email_message): |
| 222 | + # Test extracting content from a multipart email |
| 223 | + body, content_type, attachments = email_processor._extract_email_content(multipart_email_message) |
| 224 | + |
| 225 | + assert "This is the plain text version" in body |
| 226 | + assert content_type == "text/plain" |
| 227 | + assert attachments == [] |
| 228 | + |
| 229 | + @pytest.mark.asyncio |
| 230 | + async def test_extract_email_content_with_attachment(self, email_processor, email_with_attachment): |
| 231 | + # Test extracting content from an email with an attachment |
| 232 | + body, content_type, attachments = email_processor._extract_email_content(email_with_attachment) |
| 233 | + |
| 234 | + assert "Email with attachment" in body |
| 235 | + assert content_type == "text/plain" |
| 236 | + assert len(attachments) == 1 |
| 237 | + assert attachments[0]["filename"] == "test.txt" |
| 238 | + assert b"This is a test attachment" in attachments[0]["data"] |
| 239 | + |
| 240 | + @pytest.mark.asyncio |
| 241 | + async def test_parse_infra_request(self, email_processor): |
| 242 | + # Test parsing infrastructure request details from email content |
| 243 | + subject = "[URGENT] Need production database" |
| 244 | + body = """ |
| 245 | + Hi team, |
| 246 | + |
| 247 | + We need a new production database with the following specs: |
| 248 | + - 100GB storage |
| 249 | + - High availability |
| 250 | + - Daily backups |
| 251 | + |
| 252 | + This is blocking our deployment. |
| 253 | + """ |
| 254 | + |
| 255 | + request_data = email_processor. _parse_infra_request( subject, body, "[email protected]") |
| 256 | + |
| 257 | + assert request_data["title"] == subject |
| 258 | + assert "100GB storage" in request_data["description"] |
| 259 | + assert request_data["priority"] == "high" # From "URGENT" in subject |
| 260 | + assert request_data["environment"] == "production" # From "production" in subject |
| 261 | + assert request_data[ "requestor_email"] == "[email protected]" |
| 262 | + |
| 263 | + @pytest.mark.asyncio |
| 264 | + @patch("src.email_processor.EmailTemplateService") |
| 265 | + @patch("src.email_processor.EmailResponseSender") |
| 266 | + async def test_send_confirmation_email( |
| 267 | + self, mock_response_sender, mock_template_service, email_processor |
| 268 | + ): |
| 269 | + # Test sending a confirmation email |
| 270 | + email = MagicMock() |
| 271 | + email. sender = "[email protected]" |
| 272 | + |
| 273 | + # Mock template service |
| 274 | + mock_template = MagicMock() |
| 275 | + mock_template_service.return_value.get_template.return_value = mock_template |
| 276 | + |
| 277 | + # Mock response sender |
| 278 | + mock_sender = mock_response_sender.return_value |
| 279 | + |
| 280 | + # Call the method |
| 281 | + request_data = { |
| 282 | + "id": "req_123", |
| 283 | + "title": "Test Request", |
| 284 | + "description": "Test Description", |
| 285 | + "status": "received", |
| 286 | + "priority": "high", |
| 287 | + "environment": "production", |
| 288 | + "created_at": "2023-01-01T00:00:00Z" |
| 289 | + } |
| 290 | + |
| 291 | + await email_processor._send_confirmation_email(email, request_data) |
| 292 | + |
| 293 | + # Verify the template was rendered with the right context |
| 294 | + mock_template_service.return_value.render.assert_called_once() |
| 295 | + call_args = mock_template_service.return_value.render.call_args[0][1] |
| 296 | + assert call_args["request_id"] == "req_123" |
| 297 | + assert call_args["title"] == "Test Request" |
| 298 | + |
| 299 | + # Verify the email was sent |
| 300 | + mock_sender.send_email.assert_called_once() |
| 301 | + |
| 302 | + @pytest.mark.asyncio |
| 303 | + async def test_get_extension(self, email_processor): |
| 304 | + # Test getting file extensions from MIME types |
| 305 | + assert email_processor._get_extension("text/plain") == ".txt" |
| 306 | + assert email_processor._get_extension("application/pdf") == ".pdf" |
| 307 | + assert email_processor._get_extension("image/jpeg") == ".jpg" |
| 308 | + assert email_processor._get_extension("unknown/type") == ".bin" |
| 309 | + |
| 310 | + @pytest.mark.asyncio |
| 311 | + async def test_handle_standard_email(self, email_processor, mock_db_session): |
| 312 | + # Test handling a standard email |
| 313 | + email = MagicMock() |
| 314 | + email. sender = "[email protected]" |
| 315 | + email.subject = "General inquiry" |
| 316 | + email.body = "I have a question about your service" |
| 317 | + |
| 318 | + await email_processor._handle_standard_email(mock_db_session, email) |
| 319 | + |
| 320 | + # In this simple implementation, it just logs the email |
| 321 | + # In a real test, you'd verify any side effects |
| 322 | + assert True |
| 323 | + |
| 324 | + @pytest.mark.asyncio |
| 325 | + @patch("src.email_processor.EmailTemplateService") |
| 326 | + async def test_send_unsubscribe_confirmation( |
| 327 | + self, mock_template_service, email_processor |
| 328 | + ): |
| 329 | + # Test sending an unsubscribe confirmation |
| 330 | + email = MagicMock() |
| 331 | + email. sender = "[email protected]" |
| 332 | + |
| 333 | + # Call the method |
| 334 | + await email_processor._send_unsubscribe_confirmation(email) |
| 335 | + |
| 336 | + # Verify the template was rendered |
| 337 | + mock_template_service.return_value.get_template.assert_called_once_with("unsubscribe_confirmation") |
| 338 | + mock_template_service.return_value.render.assert_called_once() |
| 339 | + |
| 340 | + # In a real implementation, you'd also verify the email was sent |
| 341 | + |
| 342 | +# This test class can be extended with more test cases as needed |
0 commit comments