diff --git a/tests/core/lib/test_http.py b/tests/core/lib/test_http.py new file mode 100644 index 000000000..61175e0df --- /dev/null +++ b/tests/core/lib/test_http.py @@ -0,0 +1,435 @@ +from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock, Mock, patch + +from nettacker.core.lib.http import ( + HttpEngine, + perform_request_action, + response_conditions_matched, + send_request, +) + + +class MockResponse: + """Mock HTTP response for testing.""" + + def __init__( + self, + status=200, + content=b"success", + headers={}, + reason="OK", + url="http://test.com", + ): + self.status = status + self.content = Mock() + self.content.read = AsyncMock(return_value=content) + self.headers = headers + self.reason = reason + self.url = url + + +class AsyncContextManagerMock: + """Mock async context manager for HTTP actions.""" + + def __init__(self, return_value=None, exception=None): + self.return_value = return_value + self.exception = exception + + def __call__(self, *args, **kwargs): + return self + + async def __aenter__(self): + if self.exception: + raise self.exception + return self.return_value + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + +class TestPerformRequestAction(IsolatedAsyncioTestCase): + async def test_successful_request(self): + """Test perform_request_action with a successful response.""" + mock_response = MockResponse() + action = AsyncContextManagerMock(return_value=mock_response) + result = await perform_request_action(action, {"url": "http://test.com"}) + self.assertEqual(result["status_code"], "200") + self.assertEqual(result["content"], b"success") + self.assertEqual(result["url"], "http://test.com") + + async def test_request_timing(self): + """Test perform_request_action includes response time.""" + mock_start_time = Mock(return_value=1.0) + mock_end_time = Mock(return_value=1.1) + with patch( + "nettacker.core.lib.http.time.time", side_effect=[mock_start_time(), mock_end_time()] + ): + mock_response = MockResponse() + action = AsyncContextManagerMock(return_value=mock_response) + result = await perform_request_action(action, {"url": "http://test.com"}) + self.assertAlmostEqual(result["responsetime"], 0.1) + + async def test_request_error(self): + """Test perform_request_action with a request error.""" + action = AsyncContextManagerMock(exception=Exception("Request failed")) + with self.assertRaisesRegex(Exception, "Request failed"): + await perform_request_action(action, {"url": "http://test.com"}) + + +class TestSendRequest(IsolatedAsyncioTestCase): + async def test_method_execution(self): + """Test send_request executes the specified method.""" + options = {"url": "http://test.com"} + with patch("aiohttp.ClientSession") as mock_session: + session_instance = mock_session.return_value + session_instance.__aenter__ = AsyncMock(return_value=session_instance) + session_instance.__aexit__ = AsyncMock() + + mock_response = MockResponse() + mock_cm = AsyncContextManagerMock(return_value=mock_response) + session_instance.get = mock_cm + + result = await send_request(options, "get") + self.assertEqual(result["status_code"], "200") + self.assertEqual(result["content"], b"success") + + async def test_session_cleanup(self): + """Test that session is cleaned up in success and failure cases.""" + options = {"url": "http://test.com"} + + # Test successful case + with patch("aiohttp.ClientSession") as mock_session: + session_instance = mock_session.return_value + session_instance.__aenter__ = AsyncMock(return_value=session_instance) + session_instance.__aexit__ = AsyncMock() + + mock_response = MockResponse() + mock_cm = AsyncContextManagerMock(return_value=mock_response) + session_instance.get = mock_cm + + result = await send_request(options, "get") + self.assertEqual(result["status_code"], "200") + self.assertTrue(session_instance.__aexit__.called) + + # Test error case + with patch("aiohttp.ClientSession") as mock_session: + session_instance = mock_session.return_value + session_instance.__aenter__ = AsyncMock(return_value=session_instance) + session_instance.__aexit__ = AsyncMock() + + mock_cm = AsyncContextManagerMock(exception=Exception("Test error")) + session_instance.get = mock_cm + + result = await send_request(options, "get") + self.assertIsNone(result) + self.assertTrue(session_instance.__aexit__.called) + + +class TestResponseConditionsMatched(IsolatedAsyncioTestCase): + def test_conditions_status_code_and(self): + """Test status_code with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"status_code": {"regex": "200", "reverse": False}}, + } + } + response = {"status_code": "200", "content": "test"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"status_code": ["200"]}) + + def test_conditions_content_and(self): + """Test content with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"content": {"regex": "test", "reverse": False}}, + } + } + response = {"status_code": "200", "content": "test content"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"content": ["test"]}) + + def test_conditions_content_reverse_and(self): + """Test content with reverse AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"content": {"regex": "test", "reverse": True}}, + } + } + response = {"status_code": "200", "content": "other"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"content": True}) + + def test_conditions_headers_and(self): + """Test headers with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"headers": {"Server": {"regex": "nginx", "reverse": False}}}, + } + } + response = {"status_code": "200", "headers": {"Server": "nginx"}} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"headers": {"Server": ["nginx"]}}) + + def test_conditions_reason_and(self): + """Test reason with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"reason": {"regex": "OK", "reverse": False}}, + } + } + response = {"status_code": "200", "reason": "OK"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"reason": ["OK"]}) + + def test_conditions_url_and(self): + """Test url with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"url": {"regex": "test.com", "reverse": False}}, + } + } + response = {"status_code": "200", "url": "http://test.com"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"url": ["test.com"]}) + + def test_conditions_null_response(self): + """Test null response.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"status_code": {"regex": "404", "reverse": False}}, + } + } + result = response_conditions_matched(sub_step, None) + self.assertEqual(result, {}) + + def test_conditions_binary_content_and(self): + """Test binary content with AND condition.""" + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"content": {"regex": "test", "reverse": False}}, + } + } + response = {"status_code": "200", "content": "test binary"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"content": ["test"]}) + + def test_conditions_headers_or(self): + """Test headers with OR condition.""" + sub_step = { + "response": { + "condition_type": "or", + "conditions": {"headers": {"X-Test": {"regex": "value", "reverse": False}}}, + } + } + response = {"status_code": "200", "headers": {"X-Test": "value"}} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"headers": {"X-Test": ["value"]}}) + + def test_conditions_reason_reverse_or(self): + """Test reason with reverse OR condition.""" + sub_step = { + "response": { + "condition_type": "or", + "conditions": {"reason": {"regex": "Not Found", "reverse": True}}, + } + } + response = {"status_code": "200", "reason": "OK"} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, {"reason": True}) + + def test_responsetime_operators(self): + """Test response_conditions_matched with responsetime operators.""" + test_cases = [ + ("==", 0.1, 0.1, {"responsetime": 0.1}), + ("!=", 0.2, 0.1, {"responsetime": 0.1}), + ("<", 0.2, 0.1, {"responsetime": 0.1}), + (">", 0.05, 0.1, {"responsetime": 0.1}), + ("<=", 0.1, 0.1, {"responsetime": 0.1}), + (">=", 0.1, 0.1, {"responsetime": 0.1}), + ] + + for operator, threshold, responsetime, expected in test_cases: + with self.subTest(operator=operator): + sub_step = { + "response": { + "condition_type": "and", + "conditions": {"responsetime": f"{operator} {threshold}"}, + } + } + response = {"responsetime": responsetime} + result = response_conditions_matched(sub_step, response) + self.assertEqual(result, expected) + + +class TestHttpEngine(IsolatedAsyncioTestCase): + def test_run_method_with_retries(self): + """Test HttpEngine.run with successful request and retries.""" + engine = HttpEngine() + sub_step = { + "method": "get", + "response": { + "condition_type": "or", + "conditions": {"status_code": {"regex": "200", "reverse": False}}, + }, + } + options = {"retries": 2, "user_agent": "ua1", "user_agents": ["ua1", "ua2"]} + with patch( + "nettacker.core.lib.http.send_request", new_callable=AsyncMock + ) as mock_send, patch.object(HttpEngine, "process_conditions", return_value=True): + mock_send.return_value = { + "status_code": "200", + "content": "test", + "headers": {}, + "reason": "OK", + "url": "http://test.com", + "responsetime": 0.1, + } + result = engine.run( + sub_step=sub_step, + module_name="test", + target="test.com", + scan_id="123", + options=options, + process_number=1, + module_thread_number=1, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + self.assertTrue(mock_send.called) + self.assertTrue(result) + + def test_connection_error_retry(self): + """Test HttpEngine.run retries on connection error.""" + engine = HttpEngine() + sub_step = { + "method": "get", + "response": { + "condition_type": "or", + "conditions": {"status_code": {"regex": "200", "reverse": False}}, + }, + } + options = {"retries": 2, "user_agent": "ua1", "user_agents": ["ua1"]} + with patch( + "nettacker.core.lib.http.send_request", new_callable=AsyncMock + ) as mock_send, patch.object(HttpEngine, "process_conditions", return_value=True): + mock_send.side_effect = [ + Exception("Connection error"), + { + "status_code": "200", + "content": "test", + "headers": {}, + "reason": "OK", + "url": "http://test.com", + "responsetime": 0.1, + }, + ] + result = engine.run( + sub_step=sub_step, + module_name="test", + target="test.com", + scan_id="123", + options=options, + process_number=1, + module_thread_number=1, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + self.assertEqual(mock_send.call_count, 2) + self.assertTrue(result) + + def test_iterative_response_matching(self): + """Test HttpEngine.run with iterative response matching.""" + engine = HttpEngine() + sub_step = { + "method": "get", + "response": { + "condition_type": "or", + "conditions": { + "iterative_response_match": { + "match1": { + "response": { + "condition_type": "and", + "conditions": {"content": {"regex": "pattern1", "reverse": False}}, + } + } + }, + "status_code": {"regex": "200", "reverse": False}, + }, + }, + } + options = {"retries": 1, "user_agent": "ua1", "user_agents": ["ua1"]} + with patch( + "nettacker.core.lib.http.send_request", new_callable=AsyncMock + ) as mock_send, patch.object(HttpEngine, "process_conditions") as mock_process: + mock_send.return_value = { + "status_code": "200", + "content": "pattern1", + "headers": {}, + "reason": "OK", + "url": "http://test.com", + "responsetime": 0.1, + } + + def process_conditions_side_effect(*args, **kwargs): + sub_step = args[0] + sub_step["response"]["conditions_results"] = {"match1": {"content": ["pattern1"]}} + return True + + mock_process.side_effect = process_conditions_side_effect + result = engine.run( + sub_step=sub_step, + module_name="test", + target="test.com", + scan_id="123", + options=options, + process_number=1, + module_thread_number=1, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + self.assertIn("match1", sub_step["response"]["conditions_results"]) + self.assertEqual( + sub_step["response"]["conditions_results"]["match1"]["content"], ["pattern1"] + ) + self.assertTrue(result) + + def test_invalid_method(self): + """Test HttpEngine.run with an invalid HTTP method.""" + engine = HttpEngine() + sub_step = { + "method": "invalid", + "response": { + "condition_type": "or", + "conditions": {"status_code": {"regex": "200", "reverse": False}}, + }, + } + options = {"retries": 1, "user_agent": "ua1", "user_agents": ["ua1"]} + with patch( + "nettacker.core.lib.http.send_request", new_callable=AsyncMock + ) as mock_send, patch.object(HttpEngine, "process_conditions", return_value=False): + mock_send.return_value = [] # Invalid method results in empty response + result = engine.run( + sub_step=sub_step, + module_name="test", + target="test.com", + scan_id="123", + options=options, + process_number=1, + module_thread_number=1, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + self.assertTrue(mock_send.called) + self.assertFalse(result)